Skip to main content
Glama

@arizeai/phoenix-mcp

Official
by Arize-ai
test_spans.py83.7 kB
from __future__ import annotations import json from datetime import datetime, timedelta, timezone from random import choice, random, sample from secrets import token_hex from typing import Any, Sequence, cast import pandas as pd import pytest from strawberry.relay import GlobalID from typing_extensions import TypeAlias from phoenix.client.__generated__ import v1 from .._helpers import ( _ADMIN, # pyright: ignore[reportPrivateUsage] _MEMBER, # pyright: ignore[reportPrivateUsage] _AppInfo, # pyright: ignore[reportPrivateUsage] _await_or_return, # pyright: ignore[reportPrivateUsage] _ExistingProject, _ExistingSpan, _GetUser, # pyright: ignore[reportPrivateUsage] _gql, _RoleOrUser, _until_spans_exist, ) # Type aliases for better readability SpanId: TypeAlias = str SpanGlobalId: TypeAlias = str # Type ignore for pandas operations that work correctly at runtime # pyright: reportUnknownMemberType=false # pyright: reportUnknownVariableType=false # pyright: reportUnknownArgumentType=false class TestClientForSpanAnnotationsRetrieval: @pytest.mark.parametrize("is_async", [True, False]) @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_get_span_annotations_dataframe_and_list( self, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: num_spans = len(_existing_spans) assert num_spans >= 2, "At least two existing spans are required for this test" existing_span1, (_, span_id2, *_) = sample(_existing_spans, 2) span_id1 = existing_span1.span_id project_name = existing_span1.trace.project.name user = _get_user(_app, role_or_user).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] annotation_name_1 = f"test_anno_{token_hex(4)}" annotation_name_2 = f"test_anno_{token_hex(4)}" score1 = random() score2 = random() label1 = token_hex(4) label2 = token_hex(4) explanation1 = token_hex(8) explanation2 = token_hex(8) await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.add_span_annotation( annotation_name=annotation_name_1, span_id=span_id1, annotator_kind="LLM", label=label1, score=score1, explanation=explanation1, sync=True, ) ) await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.add_span_annotation( annotation_name=annotation_name_2, span_id=span_id2, annotator_kind="CODE", label=label2, score=score2, explanation=explanation2, sync=True, ) ) df = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_span_annotations_dataframe( span_ids=[span_id1, span_id2], project_identifier=project_name, ) ) assert isinstance(df, pd.DataFrame) assert { span_id1, span_id2, }.issubset(set(df.index.astype(str))), "Expected span IDs missing from dataframe" # pyright: ignore[reportUnknownArgumentType,reportUnknownMemberType] annotations = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_span_annotations( span_ids=[span_id1, span_id1, span_id2], # include duplicate on purpose project_identifier=project_name, ) ) assert isinstance(annotations, list) assert all(isinstance(a, dict) for a in annotations) by_key: dict[tuple[str, str], v1.SpanAnnotation] = { (a["span_id"], a["name"]): a for a in annotations } key1, key2 = (span_id1, annotation_name_1), (span_id2, annotation_name_2) assert key1 in by_key, "Annotation for span 1 missing from list response" assert key2 in by_key, "Annotation for span 2 missing from list response" anno1, anno2 = by_key[key1], by_key[key2] for anno, expected_label, expected_score, expected_explanation in ( (anno1, label1, score1, explanation1), (anno2, label2, score2, explanation2), ): assert "result" in anno, "Expected 'result' key in span annotation response" res = anno["result"] assert isinstance(res, dict) assert res.get("label") == expected_label assert abs(float(res.get("score", 0.0)) - expected_score) < 1e-6 assert res.get("explanation") == expected_explanation spans_input_df = pd.DataFrame({"context.span_id": [span_id1, span_id2]}) df_from_df = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_span_annotations_dataframe( spans_dataframe=spans_input_df, project_identifier=project_name, ) ) assert isinstance(df_from_df, pd.DataFrame) for sid, aname, label, scr, expl in ( (span_id1, annotation_name_1, label1, score1, explanation1), (span_id2, annotation_name_2, label2, score2, explanation2), ): subset = df_from_df[df_from_df.index.astype(str) == sid] # pyright: ignore[reportUnknownMemberType,reportUnknownVariableType] subset = subset[subset["annotation_name"] == aname] # pyright: ignore[reportUnknownVariableType] assert not subset.empty # pyright: ignore[reportUnknownMemberType] row = subset.iloc[0] # pyright: ignore[reportUnknownVariableType,reportUnknownMemberType] assert "result.label" in row assert row.loc["result.label"] == label assert abs(float(row.loc["result.score"]) - scr) < 1e-6 # pyright: ignore[reportUnknownArgumentType] assert row.loc["result.explanation"] == expl @pytest.mark.parametrize("is_async", [True, False]) @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_note_annotations_filtering_behavior( self, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: num_spans = len(_existing_spans) assert num_spans >= 2, "At least two existing spans are required for this test" existing_span1, (_, span_id2, *_) = sample(_existing_spans, 2) span_id1 = existing_span1.span_id project_name = existing_span1.trace.project.name user = _get_user(_app, role_or_user).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] regular_annotation_name = f"test_anno_{token_hex(4)}" score1 = random() label1 = token_hex(4) explanation1 = token_hex(8) await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.add_span_annotation( annotation_name=regular_annotation_name, span_id=span_id1, annotator_kind="LLM", label=label1, score=score1, explanation=explanation1, sync=True, ) ) df_default = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_span_annotations_dataframe( span_ids=[span_id1, span_id2], project_identifier=project_name, ) ) assert isinstance(df_default, pd.DataFrame) if not df_default.empty: annotation_names_default = set(df_default["annotation_name"].tolist()) # pyright: ignore[reportUnknownVariableType,reportUnknownArgumentType] assert regular_annotation_name in annotation_names_default df_with_notes = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_span_annotations_dataframe( span_ids=[span_id1, span_id2], project_identifier=project_name, include_annotation_names=[regular_annotation_name, "note"], ) ) assert isinstance(df_with_notes, pd.DataFrame) if not df_with_notes.empty: annotation_names_with_notes = set(df_with_notes["annotation_name"].tolist()) # pyright: ignore[reportUnknownVariableType,reportUnknownArgumentType] assert regular_annotation_name in annotation_names_with_notes df_excluded = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_span_annotations_dataframe( span_ids=[span_id1, span_id2], project_identifier=project_name, exclude_annotation_names=[regular_annotation_name], ) ) assert isinstance(df_excluded, pd.DataFrame) if not df_excluded.empty: annotation_names_excluded = set(df_excluded["annotation_name"].tolist()) # pyright: ignore[reportUnknownVariableType,reportUnknownArgumentType] assert regular_annotation_name not in annotation_names_excluded annotations_default = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_span_annotations( span_ids=[span_id1, span_id2], project_identifier=project_name, ) ) assert isinstance(annotations_default, list) if annotations_default: default_names = {a["name"] for a in annotations_default} assert regular_annotation_name in default_names annotations_with_notes = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_span_annotations( span_ids=[span_id1, span_id2], project_identifier=project_name, include_annotation_names=[regular_annotation_name, "note"], ) ) assert isinstance(annotations_with_notes, list) if annotations_with_notes: with_notes_names = {a["name"] for a in annotations_with_notes} assert regular_annotation_name in with_notes_names def test_invalid_arguments_validation( self, _existing_project: _ExistingProject, _app: _AppInfo, ) -> None: """Supplying multiple or no parameters should error.""" from phoenix.client import Client spans_client = Client(base_url=_app.base_url).spans project_name = _existing_project.name # Test get_span_annotations_dataframe with pytest.raises(ValueError): spans_client.get_span_annotations_dataframe(project_identifier=project_name) dummy_df = pd.DataFrame() with pytest.raises(ValueError): spans_client.get_span_annotations_dataframe( spans_dataframe=dummy_df, span_ids=["abc"], project_identifier=project_name, ) # Create complete v1.Span objects for testing test_span_1 = cast( v1.Span, { "id": "test_1", "name": "test_span_no_id", "context": {"trace_id": "trace123", "span_id": "abc"}, "span_kind": "INTERNAL", "start_time": "2023-01-01T00:00:00Z", "end_time": "2023-01-01T00:01:00Z", "status_code": "OK", }, ) test_span_2 = cast( v1.Span, { "id": "test_2", "name": "valid_span", "context": {"trace_id": "trace456", "span_id": "def"}, "span_kind": "INTERNAL", "start_time": "2023-01-01T00:00:00Z", "end_time": "2023-01-01T00:01:00Z", "status_code": "OK", }, ) with pytest.raises(ValueError): spans_client.get_span_annotations_dataframe( spans_dataframe=dummy_df, spans=[test_span_1], project_identifier=project_name, ) with pytest.raises(ValueError): spans_client.get_span_annotations_dataframe( span_ids=["abc"], spans=[test_span_2], project_identifier=project_name, ) # Test get_span_annotations with pytest.raises(ValueError): spans_client.get_span_annotations(project_identifier=project_name) with pytest.raises(ValueError): spans_client.get_span_annotations( span_ids=["abc"], spans=[test_span_2], project_identifier=project_name, ) @pytest.mark.parametrize("is_async", [True, False]) @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_get_span_annotations_with_spans_objects( self, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test getting span annotations using Span objects from get_spans.""" num_spans = len(_existing_spans) assert num_spans >= 2, "At least two existing spans are required for this test" existing_span1, (_, span_id2, *_) = sample(_existing_spans, 2) span_id1 = existing_span1.span_id project_name = existing_span1.trace.project.name user = _get_user(_app, role_or_user).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] annotation_name_1 = f"test_spans_obj_{token_hex(4)}" annotation_name_2 = f"test_spans_obj_{token_hex(4)}" score1 = 0.8 score2 = 0.6 label1 = "positive" label2 = "negative" # Add annotations to specific spans await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.add_span_annotation( annotation_name=annotation_name_1, span_id=span_id1, annotator_kind="LLM", label=label1, score=score1, sync=True, ) ) await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.add_span_annotation( annotation_name=annotation_name_2, span_id=span_id2, annotator_kind="CODE", label=label2, score=score2, sync=True, ) ) # Get spans using the new get_spans method spans = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, limit=num_spans, ) ) assert len(spans) == num_spans, "Should retrieve all existing spans" # Filter to only the spans we're interested in target_spans = [s for s in spans if s["context"]["span_id"] in [span_id1, span_id2]] assert len(target_spans) == 2, "Should find the two test spans" # Test get_span_annotations_dataframe with spans objects df = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_span_annotations_dataframe( spans=target_spans, project_identifier=project_name, include_annotation_names=[annotation_name_1, annotation_name_2], ) ) assert isinstance(df, pd.DataFrame) assert len(df) == 2, "Should have annotations for both spans" # Test get_span_annotations with spans objects annotations = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_span_annotations( spans=target_spans, project_identifier=project_name, include_annotation_names=[annotation_name_1, annotation_name_2], ) ) assert isinstance(annotations, list) assert len(annotations) == 2, "Should have annotations for both spans" # Verify the annotations contain our test data by_span_name = {(a["span_id"], a["name"]): a for a in annotations} key1 = (span_id1, annotation_name_1) key2 = (span_id2, annotation_name_2) assert key1 in by_span_name, f"Annotation {key1} not found" assert key2 in by_span_name, f"Annotation {key2} not found" # Test with spans that have missing span_ids (should not cause errors) spans_with_missing_ids: list[v1.Span] = [ cast( v1.Span, { "id": "test_missing_1", "name": "test_span_no_id", "context": {"trace_id": "trace789", "span_id": ""}, # Empty span_id "span_kind": "INTERNAL", "start_time": "2023-01-01T00:00:00Z", "end_time": "2023-01-01T00:01:00Z", "status_code": "OK", }, ), cast( v1.Span, { "id": "test_valid_1", "name": "valid_span", "context": {"trace_id": "trace999", "span_id": span_id1}, "span_kind": "INTERNAL", "start_time": "2023-01-01T00:00:00Z", "end_time": "2023-01-01T00:01:00Z", "status_code": "OK", }, ), ] annotations_filtered = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_span_annotations( spans=spans_with_missing_ids, project_identifier=project_name, ) ) # Should only get annotations for the span with valid span_id span_ids_found = {a["span_id"] for a in annotations_filtered} assert span_id1 in span_ids_found assert len([a for a in annotations_filtered if a["span_id"] == span_id1]) >= 1 class TestClientForSpansRetrieval: """Test the get_spans method with various filtering and pagination options.""" @pytest.mark.parametrize("is_async", [True, False]) @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_basic_span_retrieval( self, is_async: bool, role_or_user: _RoleOrUser, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test basic span retrieval returns ergonomic span format.""" num_spans = len(_existing_spans) assert num_spans >= 1, "At least one existing span is required for this test" existing_span1 = choice(_existing_spans) project_name = existing_span1.trace.project.name user = _get_user(_app, role_or_user).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] spans = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, limit=num_spans, ) ) assert isinstance(spans, list) assert len(spans) == num_spans # Each span should be a dict with the expected structure for span in spans: assert isinstance(span, dict) # Check required fields exist according to v1.Span assert "id" in span assert "name" in span assert "context" in span assert "span_kind" in span assert "start_time" in span assert "end_time" in span assert "status_code" in span # Check context structure context = span["context"] assert isinstance(context, dict) assert "trace_id" in context assert "span_id" in context assert isinstance(context["trace_id"], str) assert isinstance(context["span_id"], str) @pytest.mark.parametrize("is_async", [True, False]) async def test_time_filtering( self, is_async: bool, _existing_project: _ExistingProject, _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test that start_time and end_time filters work correctly.""" user = _get_user(_app, _MEMBER).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] # Create test spans with known, distinct timestamps base_time = datetime.now(timezone.utc) trace_id = f"trace_time_filter_{token_hex(16)}" # Create spans with 10-second intervals to ensure distinct timestamps test_spans: list[v1.Span] = [] span_times: list[datetime] = [] num_spans = 5 for i in range(num_spans): span_time = base_time + timedelta(seconds=i * 10) span_times.append(span_time) span = cast( v1.Span, { "name": f"time_filter_span_{i}", "context": { "trace_id": trace_id, "span_id": f"span_time_{token_hex(8)}_{i}", }, "span_kind": "CHAIN", "start_time": span_time.isoformat(), "end_time": (span_time + timedelta(seconds=1)).isoformat(), "status_code": "OK", "attributes": {"time_index": i}, }, ) test_spans.append(span) project_name = _existing_project.name # Create the test spans create_result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.log_spans( # pyright: ignore[reportAttributeAccessIssue] project_identifier=project_name, spans=test_spans, ) ) assert create_result["total_queued"] == num_spans, ( f"Failed to create test spans: {create_result}" ) # Wait for spans to be processed await _until_spans_exist(_app, [s["context"]["span_id"] for s in test_spans]) # Test 1: Filter to get only middle spans (index 1, 2, 3) middle_start = span_times[1] - timedelta(seconds=1) # Just before span 1 middle_end = span_times[3] + timedelta(seconds=2) # Just after span 3 middle_spans = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, start_time=middle_start, end_time=middle_end, limit=num_spans, ) ) # Filter to only our test spans our_middle_spans = [s for s in middle_spans if s["context"]["trace_id"] == trace_id] # Should have exactly 3 spans (indices 1, 2, 3) assert len(our_middle_spans) == 3, ( f"Expected 3 spans but got {len(our_middle_spans)}. " f"Span names: {[s['name'] for s in our_middle_spans]}" ) # Verify they are the correct spans found_indices: set[int] = set() for span in our_middle_spans: if "attributes" in span and "time_index" in span["attributes"]: found_indices.add(span["attributes"]["time_index"]) assert found_indices == {1, 2, 3}, f"Expected indices {{1, 2, 3}} but got {found_indices}" # Test 2: Filter to get only later spans (index 3, 4) later_start = span_times[3] - timedelta(seconds=1) # Just before span 3 later_spans = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, start_time=later_start, limit=num_spans, ) ) # Filter to only our test spans our_later_spans = [s for s in later_spans if s["context"]["trace_id"] == trace_id] # Should have exactly 2 spans (indices 3, 4) assert len(our_later_spans) == 2, ( f"Expected 2 spans but got {len(our_later_spans)}. " f"Span names: {[s['name'] for s in our_later_spans]}" ) # Test 3: Filter to get only earlier spans (index 0, 1) earlier_end = span_times[1] + timedelta(seconds=2) # Just after span 1 earlier_spans = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, end_time=earlier_end, limit=num_spans, ) ) # Filter to only our test spans our_earlier_spans = [s for s in earlier_spans if s["context"]["trace_id"] == trace_id] # Should have exactly 2 spans (indices 0, 1) assert len(our_earlier_spans) == 2, ( f"Expected 2 spans but got {len(our_earlier_spans)}. " f"Span names: {[s['name'] for s in our_earlier_spans]}" ) @pytest.mark.parametrize("is_async", [True, False]) async def test_automatic_pagination( self, is_async: bool, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test that the method automatically handles pagination to fetch up to the limit.""" assert len(_existing_spans) >= 102, "At least 102 spans are required for this test" existing_span1 = choice(_existing_spans) project_name = existing_span1.trace.project.name user = _get_user(_app, _MEMBER).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] # The method uses page_size = min(100, limit), so test with limit > 100 # to ensure pagination happens (if there are enough spans) limit = 101 spans = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, limit=limit, ) ) # We should get up to the limit, or all available spans assert len(spans) == limit # Test with small limit small_limit = 5 small_spans = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, limit=small_limit, ) ) assert len(small_spans) == small_limit @pytest.mark.parametrize("is_async", [True, False]) async def test_project_identifier_types( self, is_async: bool, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test that project identifier works with both project names and IDs.""" assert _existing_spans, "At least one existing span is required for this test" existing_span1 = choice(_existing_spans) project_id = str(existing_span1.trace.project.id) project_name = existing_span1.trace.project.name user = _get_user(_app, _ADMIN).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] # Get spans by project name spans_by_name = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, limit=5, ) ) # Get spans by project ID spans_by_id = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_id, limit=5, ) ) # Both should return spans assert len(spans_by_name) > 0 assert len(spans_by_id) > 0 @pytest.mark.parametrize("is_async", [True, False]) async def test_span_structure( self, is_async: bool, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: assert _existing_spans, "At least one existing span is required for this test" existing_span = choice(_existing_spans) project_name = existing_span.trace.project.name user = _get_user(_app, _MEMBER).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] spans = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, limit=10, ) ) assert len(spans) > 0 for span in spans: # Check required fields from v1.Span assert "id" in span assert "name" in span assert "context" in span assert "span_kind" in span assert "start_time" in span assert "end_time" in span assert "status_code" in span # Check datetime fields are strings (ISO format) assert isinstance(span["start_time"], str) assert isinstance(span["end_time"], str) # Check context structure context = span["context"] assert isinstance(context, dict) assert "trace_id" in context assert "span_id" in context assert isinstance(context["trace_id"], str) assert isinstance(context["span_id"], str) # Check optional attributes if "attributes" in span: assert isinstance(span["attributes"], dict) # Check events if present if "events" in span: assert isinstance(span["events"], list) for event in span["events"]: assert isinstance(event, dict) assert "name" in event assert "timestamp" in event assert isinstance(event["timestamp"], str) if "attributes" in event: assert isinstance(event["attributes"], dict) @pytest.mark.parametrize("is_async", [True, False]) async def test_empty_results( self, is_async: bool, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test behavior when no spans match the filter criteria.""" assert _existing_spans, "At least one existing span is required for this test" existing_span = choice(_existing_spans) project_name = existing_span.trace.project.name user = _get_user(_app, _MEMBER).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] # Use a far future date that shouldn't have any spans far_future = datetime.now(timezone.utc) + timedelta(days=365 * 10) spans = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, start_time=far_future, limit=10, ) ) # Should return empty list, not error assert isinstance(spans, list) assert len(spans) == 0 @pytest.mark.parametrize("is_async", [True, False]) async def test_invalid_project_identifier( self, is_async: bool, _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test error handling for invalid project identifier.""" user = _get_user(_app, _MEMBER).log_in(_app) api_key = str(user.create_api_key(_app)) import httpx from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] # Test with non-existent project with pytest.raises(httpx.HTTPStatusError): await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier="non_existent_project_xyz_123", limit=10, ) ) @pytest.mark.parametrize("is_async", [True, False]) async def test_client_get_spans( self, is_async: bool, _existing_spans: Sequence[_ExistingSpan], _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test the get_spans method returns spans correctly.""" num_spans = len(_existing_spans) assert num_spans >= 2, "At least two existing spans are required for this test" existing_span1, (_, span_id2, *_) = sample(_existing_spans, 2) span_id1 = existing_span1.span_id project_name = existing_span1.trace.project.name user = _get_user(_app, _MEMBER).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] all_spans = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, limit=num_spans ) ) span_ids_found = {span["context"]["span_id"] for span in all_spans} assert span_id1 in span_ids_found, f"Expected span {span_id1} not found in {span_ids_found}" assert span_id2 in span_ids_found, f"Expected span {span_id2} not found in {span_ids_found}" limited_spans = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, limit=1 ) ) assert len(limited_spans) == 1, "Limit parameter should be respected" # Each span should have required fields for span in all_spans: assert "id" in span assert "name" in span assert "context" in span assert "span_id" in span["context"] assert "trace_id" in span["context"] class TestClientForSpanCreation: def _create_test_span(self, name: str = "", **kwargs: Any) -> v1.Span: """Helper to create a standard test span with optional customizations.""" defaults: dict[str, Any] = { "name": name or f"test_span_{token_hex(4)}", "context": { "trace_id": f"trace_{token_hex(16)}", "span_id": f"span_{token_hex(8)}", }, "span_kind": "CHAIN", "start_time": datetime.now(timezone.utc).isoformat(), "end_time": (datetime.now(timezone.utc) + timedelta(seconds=1)).isoformat(), "status_code": "OK", } # Merge kwargs into defaults result: dict[str, Any] = {**defaults, **kwargs} if "context" in kwargs: result["context"] = {**defaults["context"], **kwargs["context"]} return cast(v1.Span, result) async def test_basic_span_operations( self, _existing_project: _ExistingProject, _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test basic span creation, duplicates, and error handling in one efficient test.""" user = _get_user(_app, _MEMBER).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import Client from phoenix.client.exceptions import SpanCreationError client = Client(base_url=_app.base_url, api_key=api_key) # Test 1: Basic span creation with parent-child relationship trace_id = f"trace_{token_hex(16)}" parent_span_id = f"span_{token_hex(8)}" child_span_id = f"span_{token_hex(8)}" parent_span = self._create_test_span( "parent", context={"trace_id": trace_id, "span_id": parent_span_id}, attributes={"test_attr": "parent_value"}, ) child_span = self._create_test_span( "child", context={"trace_id": trace_id, "span_id": child_span_id}, parent_id=parent_span_id, span_kind="LLM", attributes={"test_attr": "child_value"}, ) project_name = _existing_project.name num_spans = 2 # Create spans successfully result = client.spans.log_spans( # pyright: ignore[reportAttributeAccessIssue] project_identifier=project_name, spans=[parent_span, child_span], ) assert result["total_received"] == num_spans assert result["total_queued"] == num_spans await _until_spans_exist( _app, [parent_span["context"]["span_id"], child_span["context"]["span_id"]] ) # Test 2: Duplicate span rejection with pytest.raises(SpanCreationError) as exc_info: client.spans.log_spans( # pyright: ignore[reportAttributeAccessIssue] project_identifier=project_name, spans=[parent_span], # Duplicate ) error = exc_info.value assert error.total_duplicates == 1 assert error.total_queued == 0 # Test 3: Invalid span handling invalid_span = self._create_test_span( "invalid", start_time="invalid-datetime-format", ) with pytest.raises(SpanCreationError) as exc_info: client.spans.log_spans( # pyright: ignore[reportAttributeAccessIssue] project_identifier=project_name, spans=[invalid_span], ) error = exc_info.value assert error.total_invalid == 1 assert error.total_queued == 0 # Test 4: Non-existent project name is accepted (may be auto-created) # Only GlobalIDs are validated for existence result_nonexistent = client.spans.log_spans( # pyright: ignore[reportAttributeAccessIssue] project_identifier="non_existent_project_xyz", spans=[self._create_test_span("test")], ) assert result_nonexistent["total_received"] == 1 assert result_nonexistent["total_queued"] == 1 # Test 5: Error handling for non-existent project by GlobalID import httpx with pytest.raises(httpx.HTTPStatusError) as http_exc_info: client.spans.log_spans( # pyright: ignore[reportAttributeAccessIssue] project_identifier=str(GlobalID("Project", "999999")), # Non-existent GlobalID spans=[self._create_test_span("test")], ) assert http_exc_info.value.response.status_code == 404 @pytest.mark.parametrize("is_async", [True, False]) async def test_helper_functions_round_trip( self, is_async: bool, _existing_project: _ExistingProject, _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test round-tripping spans through helper functions efficiently.""" # Use deterministic seed to avoid random collisions import random original_state = random.getstate() try: random.seed(100 + (1 if is_async else 0)) user = _get_user(_app, _MEMBER).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient from phoenix.client.helpers.spans import ( dataframe_to_spans, uniquify_spans, uniquify_spans_dataframe, ) Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] # Create test spans with rich attributes for round-trip testing trace_id = f"trace_{token_hex(16)}" test_spans = [ self._create_test_span( "roundtrip_parent", context={"trace_id": trace_id, "span_id": f"span_{token_hex(8)}"}, attributes={ "input.value": "test input", "output.value": "test output", "metadata.key": "metadata_value", }, events=[ { "name": "test_event", "timestamp": datetime.now(timezone.utc).isoformat(), "attributes": {"event_attr": "event_value"}, } ], ), self._create_test_span( "roundtrip_child", context={"trace_id": trace_id, "span_id": f"span_{token_hex(8)}"}, span_kind="LLM", attributes={"llm.model": "gpt-4", "llm.temperature": 0.7}, ), ] # Set parent relationship test_spans[1] = cast( v1.Span, {**test_spans[1], "parent_id": test_spans[0]["context"]["span_id"]} ) project_name = _existing_project.name num_spans = len(test_spans) # Create original spans result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.log_spans( # pyright: ignore[reportAttributeAccessIssue] project_identifier=project_name, spans=test_spans, ) ) assert result["total_queued"] == num_spans await _until_spans_exist(_app, [span["context"]["span_id"] for span in test_spans]) df = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans_dataframe( project_identifier=project_name, limit=num_spans, ) ) # Filter to our test spans our_spans_mask = df["context.trace_id"] == trace_id our_df = df[our_spans_mask].copy() # Test 1: DataFrame to spans conversion reconstructed_spans = dataframe_to_spans(our_df) assert len(reconstructed_spans) == num_spans spans_by_name = {span["name"]: span for span in reconstructed_spans} assert "roundtrip_parent" in spans_by_name assert "roundtrip_child" in spans_by_name parent = spans_by_name["roundtrip_parent"] child = spans_by_name["roundtrip_child"] # Verify structure preservation assert parent["span_kind"] == "CHAIN" assert child["span_kind"] == "LLM" assert parent["context"]["trace_id"] == trace_id assert child["context"]["trace_id"] == trace_id assert child.get("parent_id") == parent["context"]["span_id"] # Test 2: uniquify_spans with spans reconstructed_spans[0]["context"]["trace_id"] [s["context"]["span_id"] for s in reconstructed_spans] unique_spans = uniquify_spans(reconstructed_spans, in_place=False) assert len(unique_spans) == len(reconstructed_spans) # Verify IDs changed but structure preserved for original, unique in zip(reconstructed_spans, unique_spans): assert unique["context"]["trace_id"] != original["context"]["trace_id"] assert unique["context"]["span_id"] != original["context"]["span_id"] assert unique["name"] == original["name"] assert unique["span_kind"] == original["span_kind"] # Test 3: uniquify_spans with DataFrame original_df_trace_ids = our_df["context.trace_id"].tolist() # pyright: ignore[reportUnknownVariableType] original_df_span_ids = our_df["context.span_id"].tolist() # pyright: ignore[reportUnknownVariableType] unique_df = uniquify_spans_dataframe(our_df, in_place=False) # Verify DataFrame uniquification assert unique_df["context.trace_id"].tolist() != original_df_trace_ids assert unique_df["context.span_id"].tolist() != original_df_span_ids assert len(unique_df["context.span_id"].unique()) == len(unique_df) # pyright: ignore[reportUnknownMemberType,reportUnknownArgumentType] # Verify parent-child relationships preserved in DataFrame parent_mask = unique_df["name"] == "roundtrip_parent" child_mask = unique_df["name"] == "roundtrip_child" if parent_mask.any() and child_mask.any(): # pyright: ignore[reportUnknownMemberType] new_parent_span_id = unique_df[parent_mask].index[0] # pyright: ignore[reportUnknownVariableType,reportUnknownMemberType] child_parent_id = unique_df[child_mask]["parent_id"].iloc[0] # pyright: ignore[reportUnknownVariableType,reportUnknownMemberType] assert child_parent_id == new_parent_span_id # Test 4: Full round-trip - DataFrame -> spans -> uniquify -> create final_spans = dataframe_to_spans(unique_df) final_unique_spans = uniquify_spans(final_spans, in_place=False) # Verify final spans are valid and can be created assert len(final_unique_spans) == num_spans for span in final_unique_spans: assert "context" in span assert "span_id" in span["context"] assert "trace_id" in span["context"] assert span["context"]["span_id"] not in original_df_span_ids assert span["context"]["trace_id"] not in original_df_trace_ids finally: random.setstate(original_state) @pytest.mark.parametrize("is_async", [True, False]) async def test_batch_creation_and_retrieval( self, is_async: bool, _existing_project: _ExistingProject, _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test efficient batch span creation and retrieval.""" user = _get_user(_app, _ADMIN).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] project_name = _existing_project.name num_spans = 5 trace_id = f"batch_trace_{token_hex(16)}" batch_spans = [ self._create_test_span( f"batch_{i}", context={"trace_id": trace_id, "span_id": f"span_{token_hex(8)}"}, attributes={"batch_index": i}, ) for i in range(num_spans) ] # Create batch result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.log_spans( # pyright: ignore[reportAttributeAccessIssue] project_identifier=project_name, spans=batch_spans, ) ) assert result["total_received"] == num_spans assert result["total_queued"] == num_spans await _until_spans_exist(_app, [span["context"]["span_id"] for span in batch_spans]) retrieved_spans = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, limit=num_spans, ) ) created_span_ids = {s["context"]["span_id"] for s in batch_spans} retrieved_span_ids = {s["context"]["span_id"] for s in retrieved_spans} assert len(created_span_ids & retrieved_span_ids) >= 0 @pytest.mark.parametrize("is_async", [True, False]) async def test_log_spans_dataframe_roundtrip( self, is_async: bool, _existing_project: _ExistingProject, _get_user: _GetUser, _app: _AppInfo, ) -> None: user = _get_user(_app, _ADMIN).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] # Create comprehensive test DataFrame with nested attributes import pandas as pd trace_id_1 = f"df_roundtrip_{token_hex(16)}" trace_id_2 = f"df_roundtrip_{token_hex(16)}" span_id_1 = f"span_{token_hex(8)}" span_id_2 = f"span_{token_hex(8)}" base_time = datetime.now(timezone.utc) # Create comprehensive test data with nested attributes and complex types test_data: dict[str, Any] = { "name": ["ChatCompletion", "ChatCompletion"], "span_kind": ["LLM", "LLM"], "parent_id": [None, None], "start_time": [ base_time.isoformat(), (base_time + timedelta(milliseconds=100)).isoformat(), ], "end_time": [ (base_time + timedelta(seconds=3, milliseconds=500)).isoformat(), (base_time + timedelta(seconds=3, milliseconds=600)).isoformat(), ], "status_code": ["OK", "OK"], "status_message": ["", ""], "events": [[], []], "context.span_id": [span_id_1, span_id_2], "context.trace_id": [trace_id_1, trace_id_2], # Complex nested attributes "attributes.openinference.span.kind": ["LLM", "LLM"], "attributes.llm.provider": ["openai", "openai"], "attributes.llm.model_name": ["gpt-4o", "gpt-4o"], "attributes.llm.system": ["openai", "openai"], # JSON string attributes "attributes.llm.invocation_parameters": [ ( '{"temperature": 1.0, "frequency_penalty": 0.0, "presence_penalty": 0.0, ' '"top_p": 1.0}' ), ( '{"temperature": 1.0, "frequency_penalty": 0.0, "presence_penalty": 0.0, ' '"top_p": 1.0}' ), ], # Numeric attributes "attributes.llm.token_count.total": [34, 64], "attributes.llm.token_count.prompt": [22, 21], "attributes.llm.token_count.completion": [12, 43], # List attributes with complex objects "attributes.llm.input_messages": [ [ {"message.content": "You are a helpful assistant", "message.role": "system"}, {"message.content": "What is the capital of France?", "message.role": "user"}, ], [ {"message.content": "You are a helpful assistant", "message.role": "system"}, { "message.content": "Explain the concept of machine learning", "message.role": "user", }, ], ], "attributes.llm.output_messages": [ [ { "message.content": "The capital of France is Paris.", "message.role": "assistant", } ], [ { "message.content": ( "Machine learning is a subset of artificial intelligence that " "enables computers to learn and improve from experience without " "being explicitly programmed." ), "message.role": "assistant", } ], ], # Nested object attributes "attributes.url": [ {"full": "https://api.openai.com/v1/chat/completions", "path": "chat/completions"}, {"full": "https://api.openai.com/v1/chat/completions", "path": "chat/completions"}, ], "attributes.llm.prompt_template.variables": [ {"question": "What is the capital of France?"}, {"question": "Explain the concept of machine learning"}, ], # MIME type attributes "attributes.input.mime_type": ["application/json", "application/json"], "attributes.output.mime_type": ["text/plain", "text/plain"], # Complex JSON input/output values "attributes.input.value": [ ( '{"messages": [{"role": "SYSTEM", "content": "You are a helpful assistant"}, ' '{"role": "USER", "content": "{{question}}"}], "model": {"provider_key": ' '"OpenAI", "name": "gpt-4o"}, "template": {"variables": {"question": "What is ' 'the capital of France?"}, "format": "MUSTACHE"}, "prompt_name": null}' ), ( '{"messages": [{"role": "SYSTEM", "content": "You are a helpful assistant"}, ' '{"role": "USER", "content": "{{question}}"}], "model": {"provider_key": ' '"OpenAI", "name": "gpt-4o"}, "template": {"variables": {"question": ' '"Explain the concept of machine learning"}, "format": "MUSTACHE"}, ' '"prompt_name": null}' ), ], "attributes.output.value": [ "The capital of France is Paris.", ( "Machine learning is a subset of artificial intelligence that enables " "computers to learn and improve from experience without being explicitly " "programmed." ), ], } input_df = pd.DataFrame(test_data) # Set index to span_id so dataframe_to_spans uses the right span IDs input_df.set_index("context.span_id", inplace=True, drop=False) input_df.index.name = "span_id" project_name = _existing_project.name num_spans = len(input_df) # Log spans using DataFrame result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.log_spans_dataframe( # pyright: ignore[reportAttributeAccessIssue] project_identifier=project_name, spans_dataframe=input_df, ) ) assert result["total_queued"] == num_spans await _until_spans_exist(_app, input_df.index.tolist()) output_df = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans_dataframe( project_identifier=project_name, limit=num_spans, ) ) # Filter to our test spans (check both trace IDs since we have different ones) our_spans = output_df[output_df["context.trace_id"].isin([trace_id_1, trace_id_2])] assert len(our_spans) == num_spans # Verify roundtrip data by matching on name (spans may get new IDs) span_names = set(our_spans["name"].tolist()) expected_names = set(input_df["name"].tolist()) assert span_names == expected_names for _, row in our_spans.iterrows(): # Find matching original span by trace_id to handle identical names original_span = input_df[input_df["context.trace_id"] == row["context.trace_id"]] assert len(original_span) > 0 # Use len() instead of .empty for type safety orig_row = original_span.iloc[0] # Verify basic fields assert str(row["span_kind"]) == str(orig_row["span_kind"]) assert str(row["name"]) == str(orig_row["name"]) assert str(row["status_code"]) == str(orig_row["status_code"]) # Verify simple string attributes assert str(row["attributes.llm.provider"]) == str(orig_row["attributes.llm.provider"]) assert str(row["attributes.llm.model_name"]) == str( orig_row["attributes.llm.model_name"] ) assert str(row["attributes.input.mime_type"]) == str( orig_row["attributes.input.mime_type"] ) # Verify numeric attributes assert ( int(row["attributes.llm.token_count.total"]) # pyright: ignore[reportArgumentType] == int(orig_row["attributes.llm.token_count.total"]) # pyright: ignore[reportArgumentType] ) assert ( int(row["attributes.llm.token_count.prompt"]) # pyright: ignore[reportArgumentType] == int(orig_row["attributes.llm.token_count.prompt"]) # pyright: ignore[reportArgumentType] ) # Verify JSON string attributes assert str(row["attributes.llm.invocation_parameters"]) == str( orig_row["attributes.llm.invocation_parameters"] ) assert str(row["attributes.input.value"]) == str(orig_row["attributes.input.value"]) assert str(row["attributes.output.value"]) == str(orig_row["attributes.output.value"]) # Verify complex nested object attributes using JSON comparison for order-independence import ast def compare_nested_objects(val1: Any, val2: Any) -> bool: """Compare complex nested objects, handling different key orders.""" try: # Convert to strings and parse as Python literals if possible if isinstance(val1, str) and isinstance(val2, str): parsed1 = ast.literal_eval(val1) parsed2 = ast.literal_eval(val2) return bool(parsed1 == parsed2) return bool(val1 == val2) except (ValueError, SyntaxError): # If parsing fails, fallback to string comparison return str(val1) == str(val2) assert compare_nested_objects(row["attributes.url"], orig_row["attributes.url"]) assert compare_nested_objects( row["attributes.llm.prompt_template.variables"], orig_row["attributes.llm.prompt_template.variables"], ) # Verify list attributes with complex objects using order-independent comparison assert compare_nested_objects( row["attributes.llm.input_messages"], orig_row["attributes.llm.input_messages"] ) assert compare_nested_objects( row["attributes.llm.output_messages"], orig_row["attributes.llm.output_messages"] ) @pytest.mark.parametrize("is_async", [True, False]) async def test_unflatten_attributes_on_span_creation( self, is_async: bool, _existing_project: _ExistingProject, _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test that flattened attributes are properly unflattened when creating spans. This test verifies that the unflatten() function correctly converts flattened dot-separated keys into nested structures when spans are created. It uses GraphQL to check the raw database structure (nested) and the REST API to verify round-trip behavior (flattened). """ user = _get_user(_app, _MEMBER).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] project_name = _existing_project.name trace_id = f"trace_{token_hex(16)}" span_id = f"span_{token_hex(8)}" # Create a span with MIX of flattened and already-nested attributes # This tests robustness of unflatten() handling both formats test_span = self._create_test_span( "test_unflatten", context={"trace_id": trace_id, "span_id": span_id}, attributes={ # Flattened attributes "llm.model": "gpt-4", "llm.token_count.prompt": 100, "llm.token_count.completion": 50, "llm.token_count.total": 150, # Already nested attribute (should be preserved as-is) "metadata": { "user": {"id": "user123", "name": "Test User"}, "session": {"id": "session456"}, }, # Array-like structure (numeric keys with nested content) "documents.0.content": "First document", "documents.0.id": "doc1", "documents.1.content": "Second document", "documents.1.id": "doc2", }, ) # Create the span result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.log_spans( # pyright: ignore[reportAttributeAccessIssue] project_identifier=project_name, spans=[test_span], ) ) assert result["total_received"] == 1 assert result["total_queued"] == 1 # Wait for span to be processed await _until_spans_exist(_app, [span_id]) # Use GraphQL to check the nested structure in the database gql_query = """ query GetSpanAttributes($spanId: ID!) { node(id: $spanId) { ... on Span { spanId attributes } } } """ # Get the span's Global ID using GraphQL - query project directly by its Global ID trace_query = """ query GetTraceSpans($projectId: ID!, $traceId: ID!) { node(id: $projectId) { ... on Project { trace(traceId: $traceId) { spans { edges { node { id spanId } } } } } } } """ trace_result, _ = _gql( _app, user, query=trace_query, variables={"projectId": str(_existing_project.id), "traceId": trace_id}, ) assert not trace_result.get("errors"), f"GraphQL errors: {trace_result.get('errors')}" # Find our span's Global ID trace = trace_result["data"]["node"]["trace"] assert trace is not None, f"Could not find trace with traceId {trace_id}" span_global_id = None for span_edge in trace["spans"]["edges"]: if span_edge["node"]["spanId"] == span_id: span_global_id = span_edge["node"]["id"] break assert span_global_id is not None, f"Could not find span with spanId {span_id}" # Now query for the attributes using the Global ID attr_result, _ = _gql(_app, user, query=gql_query, variables={"spanId": span_global_id}) assert not attr_result.get("errors"), f"GraphQL errors: {attr_result.get('errors')}" # Get the nested attributes from GraphQL (as stored in the database) # GraphQL returns the attributes as a JSON string, so we need to parse it db_attrs_json = attr_result["data"]["node"]["attributes"] db_attrs = json.loads(db_attrs_json) # Verify the attributes are stored in nested structure in the database assert isinstance(db_attrs, dict) # Check OpenInference attributes assert "openinference" in db_attrs assert isinstance(db_attrs["openinference"], dict) assert "span" in db_attrs["openinference"] assert isinstance(db_attrs["openinference"]["span"], dict) assert "kind" in db_attrs["openinference"]["span"] assert db_attrs["openinference"]["span"]["kind"] == "CHAIN" # Check LLM attributes assert "llm" in db_attrs assert isinstance(db_attrs["llm"], dict) assert db_attrs["llm"]["model"] == "gpt-4" assert "token_count" in db_attrs["llm"] assert isinstance(db_attrs["llm"]["token_count"], dict) assert db_attrs["llm"]["token_count"]["prompt"] == 100 assert db_attrs["llm"]["token_count"]["completion"] == 50 assert db_attrs["llm"]["token_count"]["total"] == 150 # Check already-nested attributes are preserved assert "metadata" in db_attrs assert isinstance(db_attrs["metadata"], dict) assert "user" in db_attrs["metadata"] assert isinstance(db_attrs["metadata"]["user"], dict) assert db_attrs["metadata"]["user"]["id"] == "user123" assert db_attrs["metadata"]["user"]["name"] == "Test User" assert "session" in db_attrs["metadata"] assert isinstance(db_attrs["metadata"]["session"], dict) assert db_attrs["metadata"]["session"]["id"] == "session456" # Check array-like structures are converted to arrays assert "documents" in db_attrs assert isinstance(db_attrs["documents"], list) assert len(db_attrs["documents"]) == 2 assert db_attrs["documents"][0]["content"] == "First document" assert db_attrs["documents"][0]["id"] == "doc1" assert db_attrs["documents"][1]["content"] == "Second document" assert db_attrs["documents"][1]["id"] == "doc2" # Now verify the REST API round-trip (should be flattened on retrieval) spans = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, limit=100, ) ) # Find our test span test_span_retrieved = None for span in spans: if span["context"]["span_id"] == span_id: test_span_retrieved = span break assert test_span_retrieved is not None, "Test span should be found in retrieved spans" # Verify the attributes are flattened when retrieved via REST API assert "attributes" in test_span_retrieved rest_attrs = test_span_retrieved["attributes"] assert isinstance(rest_attrs, dict) # These should be flattened (dot-separated keys) assert "llm.model" in rest_attrs assert rest_attrs["llm.model"] == "gpt-4" assert "llm.token_count.prompt" in rest_attrs assert rest_attrs["llm.token_count.prompt"] == 100 assert "llm.token_count.completion" in rest_attrs assert rest_attrs["llm.token_count.completion"] == 50 # Verify originally-nested metadata is flattened in REST response assert "metadata.user.id" in rest_attrs assert rest_attrs["metadata.user.id"] == "user123" assert "metadata.session.id" in rest_attrs assert rest_attrs["metadata.session.id"] == "session456" # Verify documents array is flattened back to dotted keys assert "documents.0.content" in rest_attrs assert rest_attrs["documents.0.content"] == "First document" assert "documents.0.id" in rest_attrs assert rest_attrs["documents.0.id"] == "doc1" assert "documents.1.content" in rest_attrs assert rest_attrs["documents.1.content"] == "Second document" class TestClientForSpanDeletion: """Test the delete_span method with various span identifiers.""" @pytest.mark.parametrize("is_async", [True, False]) @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_delete_span_by_opentelemetry_span_id( self, is_async: bool, role_or_user: _RoleOrUser, _existing_project: _ExistingProject, _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test deleting a span by OpenTelemetry span_id.""" user = _get_user(_app, role_or_user).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] project_name = _existing_project.name # Create a test span to delete trace_id = f"trace_delete_{token_hex(16)}" span_id = f"span_delete_{token_hex(8)}" test_span = cast( v1.Span, { "name": "span_to_delete", "context": { "trace_id": trace_id, "span_id": span_id, }, "span_kind": "CHAIN", "start_time": datetime.now(timezone.utc).isoformat(), "end_time": (datetime.now(timezone.utc) + timedelta(seconds=1)).isoformat(), "status_code": "OK", "attributes": {"test_attr": "delete_test"}, }, ) # Create the span create_result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.log_spans( project_identifier=project_name, spans=[test_span], ) ) assert create_result["total_queued"] == 1 # Wait for span to be processed await _until_spans_exist(_app, [span_id]) # Verify span exists before deletion spans_before = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, limit=1000, ) ) span_ids_before = {s["context"]["span_id"] for s in spans_before} assert span_id in span_ids_before, "Test span should exist before deletion" # Delete the span using OpenTelemetry span_id await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.delete(span_identifier=span_id) ) # Verify span is deleted spans_after = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, limit=1000, ) ) span_ids_after = {s["context"]["span_id"] for s in spans_after} assert span_id not in span_ids_after, "Test span should be deleted" @pytest.mark.parametrize("is_async", [True, False]) @pytest.mark.parametrize("role_or_user", [_MEMBER, _ADMIN]) async def test_delete_span_by_global_id( self, is_async: bool, role_or_user: _RoleOrUser, _existing_project: _ExistingProject, _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test deleting a span by Phoenix Global ID.""" user = _get_user(_app, role_or_user).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] project_name = _existing_project.name # Create a test span to delete trace_id = f"trace_delete_global_{token_hex(16)}" span_id = f"span_delete_global_{token_hex(8)}" test_span = cast( v1.Span, { "name": "span_to_delete_by_global_id", "context": { "trace_id": trace_id, "span_id": span_id, }, "span_kind": "LLM", "start_time": datetime.now(timezone.utc).isoformat(), "end_time": (datetime.now(timezone.utc) + timedelta(seconds=1)).isoformat(), "status_code": "OK", "attributes": {"test_attr": "global_id_delete_test"}, }, ) # Create the span create_result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.log_spans( project_identifier=project_name, spans=[test_span], ) ) assert create_result["total_queued"] == 1 # Wait for span to be processed await _until_spans_exist(_app, [span_id]) # Get the span to retrieve its Global ID spans_before = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, limit=1000, ) ) # Find our test span test_span_data = None for span in spans_before: if span["context"]["span_id"] == span_id: test_span_data = span break assert test_span_data is not None, "Test span should exist before deletion" span_global_id = test_span_data.get("id") # This is the Phoenix Global ID assert span_global_id, "Span should have a global ID" # Delete the span using Phoenix Global ID await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.delete( span_identifier=span_global_id ) ) # Verify span is deleted spans_after = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, limit=1000, ) ) span_ids_after = {s["context"]["span_id"] for s in spans_after} assert span_id not in span_ids_after, "Test span should be deleted" @pytest.mark.parametrize("is_async", [True, False]) async def test_delete_span_orphaned_children_behavior( self, is_async: bool, _existing_project: _ExistingProject, _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test that deleting a parent span orphans its children but doesn't delete them.""" user = _get_user(_app, _ADMIN).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] project_name = _existing_project.name # Create parent and child spans trace_id = f"trace_orphan_{token_hex(16)}" parent_span_id = f"parent_{token_hex(8)}" child_span_id = f"child_{token_hex(8)}" parent_span = cast( v1.Span, { "name": "parent_span", "context": { "trace_id": trace_id, "span_id": parent_span_id, }, "span_kind": "CHAIN", "start_time": datetime.now(timezone.utc).isoformat(), "end_time": (datetime.now(timezone.utc) + timedelta(seconds=2)).isoformat(), "status_code": "OK", "attributes": {"span_type": "parent"}, }, ) child_span = cast( v1.Span, { "name": "child_span", "context": { "trace_id": trace_id, "span_id": child_span_id, }, "span_kind": "LLM", "parent_id": parent_span_id, # Set parent relationship "start_time": ( datetime.now(timezone.utc) + timedelta(milliseconds=100) ).isoformat(), "end_time": (datetime.now(timezone.utc) + timedelta(seconds=1)).isoformat(), "status_code": "OK", "attributes": {"span_type": "child"}, }, ) # Create both spans create_result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.log_spans( project_identifier=project_name, spans=[parent_span, child_span], ) ) assert create_result["total_queued"] == 2 # Wait for spans to be processed await _until_spans_exist(_app, [parent_span_id, child_span_id]) # Verify both spans exist before deletion spans_before = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, limit=1000, ) ) span_ids_before = {s["context"]["span_id"] for s in spans_before} assert parent_span_id in span_ids_before, "Parent span should exist before deletion" assert child_span_id in span_ids_before, "Child span should exist before deletion" # Verify parent-child relationship child_span_before = next( s for s in spans_before if s["context"]["span_id"] == child_span_id ) assert child_span_before.get("parent_id") == parent_span_id, ( "Child should have correct parent_id" ) # Delete the parent span await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.delete( span_identifier=parent_span_id ) ) # Verify parent is deleted but child remains (orphaned) spans_after = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, limit=1000, ) ) span_ids_after = {s["context"]["span_id"] for s in spans_after} assert parent_span_id not in span_ids_after, "Parent span should be deleted" assert child_span_id in span_ids_after, "Child span should remain (orphaned)" # Verify child is now orphaned (parent_id points to non-existent span) child_span_after = next(s for s in spans_after if s["context"]["span_id"] == child_span_id) assert child_span_after.get("parent_id") == parent_span_id, ( "Child should still reference deleted parent (orphaned)" ) @pytest.mark.parametrize("is_async", [True, False]) async def test_delete_nonexistent_span( self, is_async: bool, _existing_project: _ExistingProject, _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test error handling when trying to delete a non-existent span.""" user = _get_user(_app, _ADMIN).log_in(_app) api_key = str(user.create_api_key(_app)) import httpx from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] nonexistent_span_id = f"nonexistent_{token_hex(8)}" # Attempt to delete non-existent span should raise 404 with pytest.raises(httpx.HTTPStatusError) as exc_info: await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.delete( span_identifier=nonexistent_span_id ) ) assert exc_info.value.response.status_code == 404 @pytest.mark.parametrize("is_async", [True, False]) async def test_delete_span_timeout_parameter( self, is_async: bool, _existing_project: _ExistingProject, _get_user: _GetUser, _app: _AppInfo, ) -> None: """Test that timeout parameter is properly handled.""" user = _get_user(_app, _ADMIN).log_in(_app) api_key = str(user.create_api_key(_app)) from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient # type: ignore[unused-ignore] project_name = _existing_project.name # Create a test span to delete trace_id = f"trace_timeout_{token_hex(16)}" span_id = f"span_timeout_{token_hex(8)}" test_span: v1.Span = v1.Span( name="timeout_test_span", context={ "trace_id": trace_id, "span_id": span_id, }, span_kind="TOOL", start_time=datetime.now(timezone.utc).isoformat(), end_time=(datetime.now(timezone.utc) + timedelta(seconds=1)).isoformat(), status_code="OK", attributes={"test_attr": "timeout_test"}, ) # Create the span create_result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.log_spans( project_identifier=project_name, spans=[test_span], ) ) assert create_result["total_queued"] == 1 # Wait for span to be processed await _until_spans_exist(_app, [span_id]) # Delete with custom timeout (should work normally) await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.delete( span_identifier=span_id, timeout=10, # Custom timeout ) ) # Verify span is deleted spans_after = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).spans.get_spans( project_identifier=project_name, limit=1000, ) ) span_ids_after = {s["context"]["span_id"] for s in spans_after} assert span_id not in span_ids_after, "Test span should be deleted"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'

If you have feedback or need assistance with the MCP directory API, please join our Discord server