Skip to main content
Glama

@arizeai/phoenix-mcp

Official
by Arize-ai
test_experiments.py119 kB
from __future__ import annotations import asyncio from datetime import datetime, timezone from secrets import token_hex from typing import Any, Callable, Dict, Iterator, Optional, Sequence, Union, cast from unittest.mock import patch import pytest from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from strawberry.relay import GlobalID from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient from phoenix.client.__generated__ import v1 from phoenix.client.resources.datasets import Dataset from .._helpers import ( # pyright: ignore[reportPrivateUsage] _AppInfo, _await_or_return, _gql, _httpx_client, ) class _ExperimentTestHelper: """ General-purpose helper for experiment integration tests. Provides HTTP-based CRUD operations for datasets, experiments, runs, and evaluations. """ def __init__(self, _app: _AppInfo) -> None: self._app = _app self.http_client = _httpx_client(_app, _app.admin_secret) self.now = datetime.now(timezone.utc).isoformat() self._created_datasets: list[str] = [] def post_json(self, url: str, **kwargs: Any) -> Any: """Helper to POST and return parsed JSON response data.""" response = self.http_client.post(url, **kwargs) response.raise_for_status() return response.json()["data"] def get_data(self, url: str) -> Any: """Helper to GET and return parsed JSON response data.""" response = self.http_client.get(url) response.raise_for_status() return response.json()["data"] def create_dataset( self, inputs: list[dict[str, Any]], outputs: list[dict[str, Any]] ) -> tuple[str, list[v1.DatasetExample]]: """Create a dataset with a randomly generated name and return (dataset_id, examples).""" name = f"test_dataset_{token_hex(4)}" upload_result = self.post_json( "v1/datasets/upload?sync=true", json={ "action": "create", "name": name, "inputs": inputs, "outputs": outputs, }, ) dataset_id = upload_result["dataset_id"] self._created_datasets.append(dataset_id) # Get examples examples_data = self.get_data(f"v1/datasets/{dataset_id}/examples") examples = cast(list[v1.DatasetExample], examples_data["examples"]) return dataset_id, examples def create_experiment(self, dataset_id: str, repetitions: int) -> v1.Experiment: """Create an experiment and return the experiment object.""" return cast( v1.Experiment, self.post_json( f"v1/datasets/{dataset_id}/experiments", json={"repetitions": repetitions} ), ) def create_runs( self, exp_id: str, runs: list[tuple[str, int, Optional[str], Optional[str]]], start_time: Optional[str] = None, end_time: Optional[str] = None, ) -> list[v1.CreateExperimentRunResponseBodyData]: """ Create experiment runs. Args: exp_id: Experiment ID runs: List of (example_id, repetition, output, error) tuples start_time: Optional start time (defaults to self.now) end_time: Optional end time (defaults to self.now) """ created_runs: list[v1.CreateExperimentRunResponseBodyData] = [] for example_id, rep, output, error in runs: run_data = self.post_json( f"v1/experiments/{exp_id}/runs", json={ "dataset_example_id": example_id, "repetition_number": rep, "output": output, "error": error, "start_time": start_time or self.now, "end_time": end_time or self.now, }, ) created_runs.append(cast(v1.CreateExperimentRunResponseBodyData, run_data)) return created_runs def get_runs(self, experiment_id: str) -> list[v1.ExperimentRun]: """Get all runs for an experiment.""" return cast(list[v1.ExperimentRun], self.get_data(f"v1/experiments/{experiment_id}/runs")) def create_evaluation( self, run_id: str, name: str, score: Optional[float] = None, error: Optional[str] = None, start_time: Optional[str] = None, end_time: Optional[str] = None, ) -> v1.UpsertExperimentEvaluationResponseBodyData: """ Create a single evaluation annotation. Args: run_id: Experiment run ID name: Evaluator name score: Optional score (if successful) error: Optional error message (if failed) start_time: Optional start time (defaults to self.now) end_time: Optional end time (defaults to self.now) """ payload: dict[str, Any] = { "experiment_run_id": run_id, "name": name, "annotator_kind": "CODE", "start_time": start_time or self.now, "end_time": end_time or self.now, } if error is not None: payload["error"] = error elif score is not None: payload["result"] = {"score": score} else: payload["result"] = {"score": 1.0} # Default success return cast( v1.UpsertExperimentEvaluationResponseBodyData, self.post_json("v1/experiment_evaluations", json=payload), ) def create_evaluations( self, exp_id: str, success_evaluators: list[str], failed_evaluator_names: Optional[list[str]] = None, score: float = 1.0, ) -> None: """ Create evaluation annotations for all runs in an experiment. Args: exp_id: Experiment ID success_evaluators: List of evaluator names to create with success failed_evaluator_names: List of evaluator names to create with errors score: Score to use for successful evaluations (default: 1.0) """ runs = self.get_runs(exp_id) failed_evaluator_names = failed_evaluator_names or [] for run in runs: run_id = run["id"] for eval_name in success_evaluators: self.create_evaluation(run_id, eval_name, score=score) for eval_name in failed_evaluator_names: self.create_evaluation(run_id, eval_name, error="Evaluator failed") def get_experiment_annotations(self, exp_id: str) -> dict[str, Any]: """Fetch experiment runs and annotations via GraphQL with pagination.""" query = """ query GetExperimentRuns($experimentId: ID!, $cursor: String) { node(id: $experimentId) { ... on Experiment { runs(first: 100, after: $cursor) { edges { run: node { annotations { edges { annotation: node { name score error } } } } } pageInfo { hasNextPage endCursor } } } } } """ # Fetch all runs by following pagination all_runs: list[dict[str, Any]] = [] cursor: str | None = None while True: data, _ = _gql( self._app, self._app.admin_secret, query=query, variables={"experimentId": exp_id, "cursor": cursor}, ) node_data = data["data"]["node"] runs_data = node_data["runs"] all_runs.extend(runs_data["edges"]) if not runs_data["pageInfo"]["hasNextPage"]: break cursor = runs_data["pageInfo"]["endCursor"] # Return in the same format as before but with all runs return {"runs": {"edges": all_runs}} @staticmethod def assert_annotations( runs_data: list[dict[str, Any]], expected_count: int, expected_by_run: dict[str, float], ) -> None: """ Assert annotations match expectations. Args: runs_data: List of run edges from GraphQL response expected_count: Expected number of runs expected_by_run: Dict of evaluator_name -> expected_score for each run """ assert len(runs_data) == expected_count, ( f"Expected {expected_count} runs, got {len(runs_data)}" ) for run_edge in runs_data: annotations = run_edge["run"]["annotations"]["edges"] annotations_by_name = { ann["annotation"]["name"]: ann["annotation"] for ann in annotations } assert len(annotations_by_name) == len(expected_by_run), ( f"Expected {len(expected_by_run)} evaluations, got {len(annotations_by_name)}" ) for eval_name, expected_score in expected_by_run.items(): assert eval_name in annotations_by_name, f"Missing evaluator: {eval_name}" annotation = annotations_by_name[eval_name] assert annotation["score"] == expected_score, ( f"{eval_name}: expected score {expected_score}, got {annotation['score']}" ) assert annotation["error"] is None, ( f"{eval_name}: expected no error, got {annotation['error']}" ) def assert_output_by_example( self, experiment_id: str, expected: dict[int, str | list[str | None]], examples: list[v1.DatasetExample], ) -> None: """ Assert that examples have expected outputs. Args: experiment_id: The experiment ID expected: Dict mapping example index to expected output pattern(s). If str, all repetitions should match that pattern. If list[str | None], each repetition should match corresponding pattern. None values in the list skip validation for that repetition. examples: List of dataset examples to get IDs from """ runs = self.get_runs(experiment_id) # Group successful runs by example ID runs_by_example: dict[str, list[v1.ExperimentRun]] = {} for run in runs: if not run.get("error"): example_id = run["dataset_example_id"] runs_by_example.setdefault(example_id, []).append(run) # Sort each example's runs by repetition number for example_runs in runs_by_example.values(): example_runs.sort(key=lambda r: r["repetition_number"]) # Verify expected outputs for idx, pattern in expected.items(): example_id = examples[idx]["id"] example_runs = runs_by_example[example_id] if isinstance(pattern, str): # All repetitions should match this pattern for run in example_runs: output = run["output"] rep_num = run["repetition_number"] assert pattern in str(output) or output == pattern, ( f"Example {idx} rep {rep_num} should match '{pattern}', got: {output}" ) elif isinstance(pattern, list): # Each repetition should match corresponding pattern assert len(example_runs) == len(pattern), ( f"Example {idx} has {len(example_runs)} runs, expected {len(pattern)}" ) for run, expected_pattern in zip(example_runs, pattern): if expected_pattern is None: continue # Skip validation output = run["output"] rep_num = run["repetition_number"] assert expected_pattern in str(output) or output == expected_pattern, ( f"Example {idx} rep {rep_num} should match '{expected_pattern}', got: {output}" ) def clean_up(self) -> None: """Delete all datasets created during testing.""" for dataset_id in self._created_datasets: try: self.http_client.delete(f"v1/datasets/{dataset_id}") except Exception: # Silently ignore cleanup errors pass # Type alias for the setup fixture factory function _SetupExperimentTest = Callable[ [bool], tuple[Union[AsyncClient, SyncClient], "_ExperimentTestHelper"] ] @pytest.fixture def _setup_experiment_test(_app: _AppInfo) -> Iterator[_SetupExperimentTest]: """Fixture that returns a factory function for creating test helpers with automatic cleanup.""" helpers: list[_ExperimentTestHelper] = [] def _setup(is_async: bool) -> tuple[Union[AsyncClient, SyncClient], _ExperimentTestHelper]: Client = AsyncClient if is_async else SyncClient client = Client(base_url=_app.base_url, api_key=_app.admin_secret) helper = _ExperimentTestHelper(_app) helpers.append(helper) return client, helper yield _setup # Cleanup all helpers created during the test for helper in helpers: helper.clean_up() class SpanCapture: """Helper class to capture OpenTelemetry spans during testing.""" def __init__(self) -> None: self.spans: list[ReadableSpan] = [] def clear(self) -> None: self.spans.clear() class CapturingSpanExporter(SpanExporter): """A span exporter that captures spans instead of sending them to a server.""" def __init__(self, capture: SpanCapture) -> None: self.capture = capture def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: self.capture.spans.extend(spans) return SpanExportResult.SUCCESS def shutdown(self) -> None: pass class TestExperimentsIntegration: @pytest.mark.parametrize("is_async", [True, False]) async def test_run_experiment_basic( self, is_async: bool, _app: _AppInfo, ) -> None: api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient unique_name = f"test_experiment_{token_hex(4)}" dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[ {"question": "What is 2+2?"}, {"question": "What is the capital of France?"}, {"question": "Who wrote Python?"}, ], outputs=[ {"answer": "4"}, {"answer": "Paris"}, {"answer": "Guido van Rossum"}, ], metadata=[ {"category": "math"}, {"category": "geography"}, {"category": "programming"}, ], ) ) def simple_task(input: Dict[str, Any]) -> str: question = input.get("question", "") if "2+2" in question: return "The answer is 4" elif "capital" in question: return "The capital is Paris" else: return "I don't know" result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=simple_task, experiment_name=f"test_experiment_{token_hex(4)}", experiment_description="A simple test experiment", print_summary=False, ) ) assert "experiment_id" in result assert "dataset_id" in result assert "task_runs" in result assert "evaluation_runs" in result assert result["dataset_id"] == dataset.id assert len(result["task_runs"]) == 3 assert len(result["evaluation_runs"]) == 0 @pytest.mark.parametrize("is_async", [True, False]) async def test_run_experiment_creates_proper_spans( self, is_async: bool, _app: _AppInfo, ) -> None: """Test that experiments create proper OpenTelemetry spans with correct attributes.""" api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient unique_name = f"test_span_creation_{token_hex(4)}" # Create a small dataset for testing dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[ {"text": "Hello world"}, {"text": "Python is great"}, ], outputs=[ {"expected": "greeting"}, {"expected": "programming"}, ], metadata=[ {"category": "test"}, {"category": "test"}, ], ) ) def classification_task(input: Dict[str, Any]) -> str: text = input.get("text", "") if "Hello" in text: return "greeting" elif "Python" in text: return "programming" else: return "unknown" def accuracy_evaluator(output: str) -> float: return 1.0 if output in ["greeting", "programming"] else 0.0 span_capture = SpanCapture() capturing_exporter = CapturingSpanExporter(span_capture) from openinference.semconv.resource import ResourceAttributes from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor def mock_get_tracer( project_name: Optional[str] = None, base_url: Optional[str] = None, headers: Optional[Dict[str, str]] = None, ) -> tuple[Any, Any]: resource = Resource( {ResourceAttributes.PROJECT_NAME: project_name} if project_name else {} ) tracer_provider = TracerProvider(resource=resource) span_processor = SimpleSpanProcessor(capturing_exporter) tracer_provider.add_span_processor(span_processor) return tracer_provider.get_tracer(__name__), resource experiments_module = "phoenix.client.resources.experiments" with patch(f"{experiments_module}._get_tracer", side_effect=mock_get_tracer): result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=classification_task, evaluators={"accuracy_evaluator": accuracy_evaluator}, experiment_name=f"test_span_experiment_{token_hex(4)}", print_summary=False, ) ) assert len(result["task_runs"]) == 2 assert len(result["evaluation_runs"]) == 2 assert len(span_capture.spans) > 0, "No spans were captured" task_spans: list[ReadableSpan] = [] eval_spans: list[ReadableSpan] = [] for span in span_capture.spans: if span.attributes is not None: span_kind = span.attributes.get("openinference.span.kind") if span_kind == "CHAIN": task_spans.append(span) elif span_kind == "EVALUATOR": eval_spans.append(span) assert len(task_spans) == 2, f"Expected 2 task spans, got {len(task_spans)}" for task_span in task_spans: assert task_span.name == "Task: classification_task" if task_span.attributes is not None: assert task_span.attributes.get("openinference.span.kind") == "CHAIN" assert "input.value" in task_span.attributes assert "input.mime_type" in task_span.attributes assert "output.value" in task_span.attributes assert task_span.status.status_code.name == "OK" assert len(eval_spans) == 2, f"Expected 2 evaluation spans, got {len(eval_spans)}" for eval_span in eval_spans: assert eval_span.name == "Evaluation: accuracy_evaluator" if eval_span.attributes is not None: assert eval_span.attributes.get("openinference.span.kind") == "EVALUATOR" assert eval_span.status.status_code.name == "OK" for span in span_capture.spans: project_name = span.resource.attributes.get(ResourceAttributes.PROJECT_NAME) assert project_name is not None and project_name != "" for span in span_capture.spans: span_context = span.get_span_context() # type: ignore[no-untyped-call] if span_context is not None: assert span_context.trace_id != 0, "Span should have a valid trace ID" @pytest.mark.parametrize("is_async", [True, False]) async def test_run_experiment_with_evaluators( self, is_async: bool, _app: _AppInfo, ) -> None: api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient unique_name = f"test_experiment_eval_{token_hex(4)}" dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[ {"text": "Hello world"}, {"text": "Python is great"}, ], outputs=[ {"expected": "greeting"}, {"expected": "programming"}, ], ) ) def classification_task(input: Dict[str, Any]) -> str: text = input.get("text", "") if "Hello" in text: return "greeting" elif "Python" in text: return "programming" else: return "unknown" def accuracy_evaluator(output: str, expected: Dict[str, Any]) -> float: return 1.0 if output == expected.get("expected") else 0.0 def length_evaluator(output: str) -> Dict[str, Any]: return {"score": len(output) / 10.0, "label": "length_score"} result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=classification_task, evaluators=[accuracy_evaluator, length_evaluator], experiment_name=f"test_eval_experiment_{token_hex(4)}", print_summary=False, ) ) assert len(result["task_runs"]) == 2 assert len(result["evaluation_runs"]) > 0 @pytest.mark.parametrize("is_async", [True, False]) async def test_run_experiment_with_different_task_signatures( self, is_async: bool, _app: _AppInfo, ) -> None: api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient unique_name = f"test_signatures_{token_hex(4)}" dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[{"prompt": "Test prompt"}], outputs=[{"response": "Test response"}], metadata=[{"source": "test"}], ) ) def task_with_input_only(input: Dict[str, Any]) -> str: return f"Processed: {input.get('prompt', '')}" def task_with_multiple_params( input: Dict[str, Any], expected: Dict[str, Any], metadata: Dict[str, Any] ) -> str: return f"Input: {input}, Expected: {expected}, Meta: {metadata}" result1 = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=task_with_input_only, experiment_name=f"test_input_only_{token_hex(4)}", print_summary=False, ) ) assert len(result1["task_runs"]) == 1 result2 = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=task_with_multiple_params, experiment_name=f"test_multi_params_{token_hex(4)}", print_summary=False, ) ) assert len(result2["task_runs"]) == 1 @pytest.mark.parametrize("is_async", [True, False]) async def test_run_experiment_dry_run( self, is_async: bool, _app: _AppInfo, ) -> None: api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient unique_name = f"test_dry_run_{token_hex(4)}" dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[ {"text": "Sample 1"}, {"text": "Sample 2"}, {"text": "Sample 3"}, ], outputs=[ {"result": "Result 1"}, {"result": "Result 2"}, {"result": "Result 3"}, ], ) ) def simple_task(input: Dict[str, Any]) -> str: return f"Processed: {input.get('text', '')}" result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=simple_task, experiment_name="dry_run_test", dry_run=True, print_summary=False, ) ) assert result["experiment_id"] == "DRY_RUN" assert len(result["task_runs"]) == 1 result_sized = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=simple_task, experiment_name="dry_run_sized_test", dry_run=2, print_summary=False, ) ) assert len(result_sized["task_runs"]) == 2 @pytest.mark.parametrize("is_async", [True, False]) async def test_run_experiment_with_metadata( self, is_async: bool, _app: _AppInfo, ) -> None: api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient unique_name = f"test_metadata_{token_hex(4)}" dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[{"question": "Test question"}], outputs=[{"answer": "Test answer"}], ) ) def simple_task(input: Dict[str, Any]) -> str: return "Test response" experiment_metadata = { "version": "1.0", "model": "test-model", "temperature": 0.7, } result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=simple_task, experiment_name=f"test_with_metadata_{token_hex(4)}", experiment_description="Experiment with metadata", experiment_metadata=experiment_metadata, print_summary=False, ) ) assert len(result["task_runs"]) == 1 @pytest.mark.parametrize("is_async", [True, False]) async def test_task_and_evaluator_parameter_isolation( self, is_async: bool, _app: _AppInfo, ) -> None: api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient from phoenix.client.resources.experiments.types import ExampleProxy Client = AsyncClient if is_async else SyncClient unique_name = f"test_copying_{token_hex(4)}" dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[{"text": "Hello"}], outputs=[{"expected": "greeting"}], metadata=[{"category": "test"}], ) ) def mutating_task( input: Dict[str, Any], expected: Dict[str, Any], metadata: Dict[str, Any], example: Any, ) -> str: input["added_by_task"] = True expected["added_by_task"] = True metadata["added_by_task"] = True return "ok" observations: Dict[str, Any] = { "task_example_is_proxy": None, "task_example_has_id": None, "task_example_input_text": None, "ev1_input_had_task": None, "ev1_expected_had_task": None, "ev1_metadata_had_task": None, "ev1_example_is_proxy": None, "ev1_example_has_id": None, "ev2_input_had_ev1": None, "ev2_expected_had_ev1": None, "ev2_metadata_had_ev1": None, "ev2_example_is_proxy": None, "ev2_example_has_id": None, } def evaluator_one( input: Dict[str, Any], expected: Dict[str, Any], metadata: Dict[str, Any], example: Any, ) -> float: observations["ev1_input_had_task"] = "added_by_task" in input observations["ev1_expected_had_task"] = "added_by_task" in expected observations["ev1_metadata_had_task"] = "added_by_task" in metadata observations["ev1_example_is_proxy"] = isinstance(example, ExampleProxy) observations["ev1_example_has_id"] = bool(example.get("id")) input["added_by_ev1"] = True expected["added_by_ev1"] = True metadata["added_by_ev1"] = True return 1.0 def evaluator_two( input: Dict[str, Any], expected: Dict[str, Any], metadata: Dict[str, Any], example: Any, ) -> float: observations["ev2_input_had_ev1"] = "added_by_ev1" in input observations["ev2_expected_had_ev1"] = "added_by_ev1" in expected observations["ev2_metadata_had_ev1"] = "added_by_ev1" in metadata observations["ev2_example_is_proxy"] = isinstance(example, ExampleProxy) observations["ev2_example_has_id"] = bool(example.get("id")) return 1.0 result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=mutating_task, evaluators=[evaluator_one, evaluator_two], experiment_name=f"test_copying_{token_hex(4)}", print_summary=False, ) ) assert len(result["task_runs"]) == 1 assert len(result["evaluation_runs"]) == 2 assert observations["ev1_input_had_task"] is False assert observations["ev1_expected_had_task"] is False assert observations["ev1_metadata_had_task"] is False assert observations["ev2_input_had_ev1"] is False assert observations["ev2_expected_had_ev1"] is False assert observations["ev2_metadata_had_ev1"] is False assert observations["ev1_example_is_proxy"] is True assert observations["ev1_example_has_id"] is True assert observations["ev2_example_is_proxy"] is True assert observations["ev2_example_has_id"] is True original_example = next(iter(dataset.examples)) assert "added_by_task" not in original_example["input"] assert "added_by_ev1" not in original_example["input"] @pytest.mark.parametrize("is_async", [True, False]) async def test_example_proxy_properties( self, is_async: bool, _app: _AppInfo, ) -> None: api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient from phoenix.client.resources.experiments.types import ExampleProxy Client = AsyncClient if is_async else SyncClient unique_name = f"test_example_proxy_{token_hex(4)}" dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[{"question": "What is 2+2?"}], outputs=[{"answer": "4"}], metadata=[{"difficulty": "easy"}], ) ) observations: Dict[str, Any] = {} def example_inspector(example: Any) -> str: observations["is_example_proxy"] = isinstance(example, ExampleProxy) observations["has_id_property"] = hasattr(example, "id") observations["has_input_property"] = hasattr(example, "input") observations["has_output_property"] = hasattr(example, "output") observations["has_metadata_property"] = hasattr(example, "metadata") observations["has_updated_at_property"] = hasattr(example, "updated_at") observations["id_value"] = example.id if hasattr(example, "id") else None observations["input_value"] = dict(example.input) if hasattr(example, "input") else None observations["output_value"] = ( dict(example.output) if hasattr(example, "output") else None ) observations["metadata_value"] = ( dict(example.metadata) if hasattr(example, "metadata") else None ) observations["dict_access_id"] = example.get("id") observations["dict_access_input"] = example.get("input") observations["supports_iteration"] = ( list(example.keys()) if hasattr(example, "keys") else None ) return "ok" result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=example_inspector, experiment_name=f"test_proxy_{token_hex(4)}", print_summary=False, ) ) assert len(result["task_runs"]) == 1 assert observations["is_example_proxy"] is True assert observations["has_id_property"] is True assert observations["has_input_property"] is True assert observations["has_output_property"] is True assert observations["has_metadata_property"] is True assert observations["has_updated_at_property"] is True assert observations["id_value"] is not None assert observations["input_value"] == {"question": "What is 2+2?"} assert observations["output_value"] == {"answer": "4"} assert observations["metadata_value"] == {"difficulty": "easy"} assert observations["dict_access_id"] is not None assert observations["dict_access_input"] == {"question": "What is 2+2?"} assert observations["supports_iteration"] is not None assert "id" in observations["supports_iteration"] assert "input" in observations["supports_iteration"] @pytest.mark.parametrize("is_async", [True, False]) async def test_run_experiment_evaluator_types( self, is_async: bool, _app: _AppInfo, ) -> None: api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient unique_name = f"test_eval_types_{token_hex(4)}" dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[{"text": "Hello world"}], outputs=[{"expected": "greeting"}], ) ) def simple_task(input: Dict[str, Any]) -> str: return "greeting" def bool_evaluator(output: str) -> bool: return output == "greeting" def float_evaluator(output: str) -> float: return 0.95 def tuple_evaluator(output: str) -> tuple[float, str, str]: return (1.0, "correct", "The output matches expectation") def dict_evaluator(output: str) -> Dict[str, Any]: return {"score": 0.8, "label": "good"} result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=simple_task, evaluators={ "bool_eval": bool_evaluator, "float_eval": float_evaluator, "tuple_eval": tuple_evaluator, "dict_eval": dict_evaluator, }, experiment_name=f"test_eval_types_{token_hex(4)}", print_summary=False, ) ) assert len(result["task_runs"]) == 1 assert len(result["evaluation_runs"]) > 0 @pytest.mark.parametrize("is_async", [True, False]) async def test_run_async_task( self, is_async: bool, _app: _AppInfo, ) -> None: if not is_async: pytest.skip("Async tasks only supported with AsyncClient") api_key = _app.admin_secret from phoenix.client import AsyncClient unique_name = f"test_async_task_{token_hex(4)}" dataset: Dataset = await _await_or_return( AsyncClient(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[{"text": "Async test"}], outputs=[{"expected": "async_result"}], ) ) async def async_task(input: Dict[str, Any]) -> str: await asyncio.sleep(0.1) return f"async_processed_{input.get('text', '')}" result = await AsyncClient( base_url=_app.base_url, api_key=api_key ).experiments.run_experiment( dataset=dataset, task=async_task, experiment_name=f"test_async_{token_hex(4)}", print_summary=False, ) assert len(result["task_runs"]) == 1 assert "async_processed_" in result["task_runs"][0]["output"] @pytest.mark.parametrize("is_async", [True, False]) async def test_error_handling( self, is_async: bool, _app: _AppInfo, ) -> None: api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient unique_name = f"test_error_{token_hex(4)}" dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[{"text": "test"}], outputs=[{"expected": "result"}], ) ) def failing_task(input: Dict[str, Any]) -> str: raise ValueError("Task failed intentionally") result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=failing_task, experiment_name=f"test_error_{token_hex(4)}", print_summary=False, ) ) assert len(result["task_runs"]) == 1 assert "error" in result["task_runs"][0] or result["task_runs"][0]["output"] is None @pytest.mark.parametrize("is_async", [True, False]) async def test_experiment_with_empty_dataset( self, is_async: bool, _app: _AppInfo, ) -> None: api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient unique_name = f"test_empty_{token_hex(4)}" dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[{"placeholder": "temp"}], outputs=[{"placeholder": "temp"}], ) ) original_dataset_id = dataset.id original_version_id = dataset.version_id dataset._examples_data = v1.ListDatasetExamplesData( dataset_id=original_dataset_id, version_id=original_version_id, filtered_splits=[], examples=[], ) def simple_task(input: Dict[str, Any]) -> str: return "test" with pytest.raises(ValueError, match="Dataset has no examples"): await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, # pyright: ignore[reportArgumentType] task=simple_task, experiment_name="test_empty", print_summary=False, ) ) @pytest.mark.parametrize("is_async", [True, False]) async def test_evaluator_dynamic_parameter_binding( self, is_async: bool, _app: _AppInfo, ) -> None: api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient unique_name = f"test_eval_params_{token_hex(4)}" dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[ {"text": "What is 2+2?", "context": "math"}, {"text": "What is the capital of France?", "context": "geography"}, ], outputs=[ {"answer": "4", "category": "arithmetic"}, {"answer": "Paris", "category": "location"}, ], metadata=[ {"difficulty": "easy", "topic": "math"}, {"difficulty": "medium", "topic": "geography"}, ], ) ) def question_answering_task(input: Dict[str, Any]) -> str: question = input.get("text", "") if "2+2" in question: return "The answer is 4" elif "capital" in question: return "The answer is Paris" else: return "I don't know" def output_only_evaluator(output: str) -> float: return 1.0 if "answer" in output.lower() else 0.0 def accuracy_evaluator(output: str, expected: Dict[str, Any]) -> float: expected_answer = expected.get("answer", "") return 1.0 if expected_answer in output else 0.0 def comprehensive_evaluator( input: Dict[str, Any], output: str, expected: Dict[str, Any], reference: Dict[str, Any], metadata: Dict[str, Any], ) -> Dict[str, Any]: has_input = bool(input.get("text")) has_output = bool(output) has_expected = bool(expected.get("answer")) has_reference = bool(reference.get("answer")) has_metadata = bool(metadata.get("difficulty")) reference_matches_expected = reference == expected score = ( 1.0 if all( [ has_input, has_output, has_expected, has_reference, has_metadata, reference_matches_expected, ] ) else 0.0 ) return { "score": score, "label": "comprehensive_check", "explanation": ( f"Input: {has_input}, Output: {has_output}, Expected: {has_expected}, " f"Reference: {has_reference}, Metadata: {has_metadata}, " f"Reference==Expected: {reference_matches_expected}" ), } def reference_evaluator(output: str, reference: Dict[str, Any]) -> float: reference_answer = reference.get("answer", "") return 1.0 if reference_answer in output else 0.0 def metadata_evaluator(output: str, metadata: Dict[str, Any]) -> Dict[str, Any]: difficulty = metadata.get("difficulty", "unknown") topic = metadata.get("topic", "unknown") return { "score": 0.8 if difficulty == "easy" else 0.6, "label": f"{difficulty}_{topic}", "explanation": f"Difficulty: {difficulty}, Topic: {topic}", } def example_evaluator(example: Dict[str, Any]) -> Dict[str, Any]: has_id = bool(example.get("id")) has_input = isinstance(example.get("input"), dict) return { "score": 1.0 if has_id and has_input else 0.0, "label": "has_example", } result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=question_answering_task, evaluators={ "output_only": output_only_evaluator, "relevance": accuracy_evaluator, "comprehensive": comprehensive_evaluator, "reference": reference_evaluator, "metadata": metadata_evaluator, "example": example_evaluator, }, experiment_name=f"test_param_binding_{token_hex(4)}", print_summary=False, ) ) assert len(result["task_runs"]) == 2 assert len(result["evaluation_runs"]) == 12 # 2 examples * 6 evaluators comprehensive_evals = [ eval_run for eval_run in result["evaluation_runs"] if eval_run.name == "comprehensive" ] assert len(comprehensive_evals) == 2 for eval_run in comprehensive_evals: assert eval_run.result is not None assert isinstance(eval_run.result, dict) assert eval_run.result.get("score") == 1.0 assert "comprehensive_check" in (eval_run.result.get("label") or "") reference_evals = [ eval_run for eval_run in result["evaluation_runs"] if eval_run.name == "reference" ] assert len(reference_evals) == 2 for eval_run in reference_evals: assert eval_run.result is not None assert isinstance(eval_run.result, dict) assert eval_run.result.get("score") == 1.0 metadata_evals = [ eval_run for eval_run in result["evaluation_runs"] if eval_run.name == "metadata" ] assert len(metadata_evals) == 2 for eval_run in metadata_evals: assert eval_run.result is not None assert isinstance(eval_run.result, dict) assert eval_run.result.get("score") is not None assert eval_run.result.get("label") is not None assert eval_run.result.get("explanation") is not None example_evals = [ eval_run for eval_run in result["evaluation_runs"] if eval_run.name == "example" ] assert len(example_evals) == 2 for eval_run in example_evals: assert eval_run.result is not None assert isinstance(eval_run.result, dict) assert eval_run.result.get("score") == 1.0 assert eval_run.result.get("label") == "has_example" @pytest.mark.parametrize("is_async", [True, False]) async def test_task_dynamic_parameter_binding( self, is_async: bool, _app: _AppInfo, ) -> None: api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient unique_name = f"test_task_params_{token_hex(4)}" dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[ {"question": "What is 2+2?", "type": "math"}, {"question": "What is the capital of France?", "type": "geography"}, ], outputs=[ {"answer": "4", "explanation": "Basic arithmetic"}, {"answer": "Paris", "explanation": "Capital city of France"}, ], metadata=[ {"difficulty": "easy", "category": "arithmetic", "source": "textbook"}, {"difficulty": "medium", "category": "geography", "source": "atlas"}, ], ) ) def input_only_task(input: Dict[str, Any]) -> str: question = input.get("question", "") return f"Processing: {question}" def input_expected_task(input: Dict[str, Any], expected: Dict[str, Any]) -> str: question = input.get("question", "") expected_answer = expected.get("answer", "") return f"Question: {question}, Expected: {expected_answer}" def reference_task(input: Dict[str, Any], reference: Dict[str, Any]) -> str: question = input.get("question", "") ref_answer = reference.get("answer", "") return f"Q: {question}, Ref: {ref_answer}" def metadata_task(input: Dict[str, Any], metadata: Dict[str, Any]) -> str: question = input.get("question", "") difficulty = metadata.get("difficulty", "unknown") category = metadata.get("category", "unknown") return f"Q: {question} [Difficulty: {difficulty}, Category: {category}]" def comprehensive_task( input: Dict[str, Any], expected: Dict[str, Any], reference: Dict[str, Any], metadata: Dict[str, Any], example: Dict[str, Any], ) -> Dict[str, Any]: has_input = bool(input.get("question")) has_expected = bool(expected.get("answer")) has_reference = bool(reference.get("answer")) has_metadata = bool(metadata.get("difficulty")) has_example = bool(example.get("id")) # Example should have an ID reference_matches_expected = reference == expected success = all( [ has_input, has_expected, has_reference, has_metadata, has_example, reference_matches_expected, ] ) return { "success": success, "question": input.get("question", ""), "expected_answer": expected.get("answer", ""), "metadata_difficulty": metadata.get("difficulty", ""), "example_id": example.get("id", ""), "reference_matches_expected": reference_matches_expected, } result1 = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=input_only_task, experiment_name=f"test_input_only_{token_hex(4)}", print_summary=False, ) ) assert len(result1["task_runs"]) == 2 for task_run in result1["task_runs"]: assert "Processing:" in task_run["output"] result2 = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=input_expected_task, experiment_name=f"test_input_expected_{token_hex(4)}", print_summary=False, ) ) assert len(result2["task_runs"]) == 2 for task_run in result2["task_runs"]: assert "Question:" in task_run["output"] assert "Expected:" in task_run["output"] result3 = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=reference_task, experiment_name=f"test_reference_{token_hex(4)}", print_summary=False, ) ) assert len(result3["task_runs"]) == 2 for task_run in result3["task_runs"]: assert "Q:" in task_run["output"] assert "Ref:" in task_run["output"] result4 = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=metadata_task, experiment_name=f"test_metadata_{token_hex(4)}", print_summary=False, ) ) assert len(result4["task_runs"]) == 2 for task_run in result4["task_runs"]: assert "Difficulty:" in task_run["output"] assert "Category:" in task_run["output"] result5 = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=comprehensive_task, experiment_name=f"test_comprehensive_{token_hex(4)}", print_summary=False, ) ) assert len(result5["task_runs"]) == 2 for task_run in result5["task_runs"]: output = task_run["output"] assert isinstance(output, dict) assert output["success"] is True assert output["reference_matches_expected"] is True assert output["question"] != "" assert output["expected_answer"] != "" assert output["metadata_difficulty"] != "" assert output["example_id"] != "" @pytest.mark.parametrize("is_async", [True, False]) async def test_get_experiment( self, is_async: bool, _app: _AppInfo, _setup_experiment_test: _SetupExperimentTest, ) -> None: """Test getting a single experiment by ID.""" client, helper = _setup_experiment_test(is_async) # Create a dataset, experiment, and run dataset_id, examples = helper.create_dataset( inputs=[{"q": "test"}], outputs=[{"a": "answer"}], ) exp = helper.create_experiment(dataset_id, repetitions=1) helper.create_runs(exp["id"], [(examples[0]["id"], 1, "response", None)]) # Test get method retrieved = await _await_or_return(client.experiments.get(experiment_id=exp["id"])) assert retrieved["id"] == exp["id"] assert retrieved["dataset_id"] == dataset_id assert retrieved["repetitions"] == 1 assert retrieved["example_count"] == 1 assert retrieved["successful_run_count"] == 1 assert "created_at" in retrieved assert "updated_at" in retrieved @pytest.mark.parametrize("is_async", [True, False]) async def test_get_experiment_not_found( self, is_async: bool, _app: _AppInfo, _setup_experiment_test: _SetupExperimentTest, ) -> None: """Test that getting a non-existent experiment raises ValueError.""" client, _ = _setup_experiment_test(is_async) fake_id = str(GlobalID("Experiment", "999999")) with pytest.raises(ValueError, match="Experiment not found"): await _await_or_return(client.experiments.get(experiment_id=fake_id)) @pytest.mark.parametrize("is_async", [True, False]) async def test_list_experiments( self, is_async: bool, _app: _AppInfo, _setup_experiment_test: _SetupExperimentTest, ) -> None: """Test listing experiments for a dataset.""" client, helper = _setup_experiment_test(is_async) # Create a dataset dataset_id, examples = helper.create_dataset( inputs=[{"q": f"test{i}"} for i in range(3)], outputs=[{"a": f"answer{i}"} for i in range(3)], ) # Create multiple experiments with runs exp_ids: list[str] = [] for i in range(3): exp = helper.create_experiment(dataset_id, repetitions=1) # Create runs for all examples in one call runs: list[tuple[str, int, Optional[str], Optional[str]]] = [ (ex["id"], 1, f"response_{i}", None) for ex in examples ] helper.create_runs(exp["id"], runs) exp_ids.append(exp["id"]) # List experiments experiments = await _await_or_return(client.experiments.list(dataset_id=dataset_id)) assert len(experiments) == 3 for exp in experiments: assert exp["dataset_id"] == dataset_id assert exp["example_count"] == 3 assert exp["successful_run_count"] == 3 assert "id" in exp assert "created_at" in exp assert "updated_at" in exp # Verify all created experiments are in the list retrieved_ids = [exp["id"] for exp in experiments] for exp_id in exp_ids: assert exp_id in retrieved_ids @pytest.mark.parametrize("is_async", [True, False]) async def test_list_experiments_empty( self, is_async: bool, _app: _AppInfo, _setup_experiment_test: _SetupExperimentTest, ) -> None: """Test listing experiments for a dataset with no experiments.""" client, helper = _setup_experiment_test(is_async) # Create a dataset with no experiments dataset_id, _ = helper.create_dataset( inputs=[{"q": "test"}], outputs=[{"a": "answer"}], ) # List experiments experiments = await _await_or_return(client.experiments.list(dataset_id=dataset_id)) assert len(experiments) == 0 @pytest.mark.parametrize("is_async", [True, False]) async def test_list_experiments_pagination( self, is_async: bool, _app: _AppInfo, _setup_experiment_test: _SetupExperimentTest, ) -> None: """Test that list_experiments supports cursor-based pagination.""" client, helper = _setup_experiment_test(is_async) # Create a dataset dataset_id, _ = helper.create_dataset( inputs=[{"q": "test"}], outputs=[{"a": "answer"}], ) # Create multiple experiments (6 experiments) experiment_ids: list[str] = [] for _ in range(6): exp = helper.create_experiment(dataset_id, repetitions=1) experiment_ids.append(exp["id"]) # Test manual pagination with HTTP client directly to debug all_paginated_ids: list[str] = [] cursor = None page_count = 0 while True: page_count += 1 params: dict[str, Any] = {"limit": 2} if cursor: params["cursor"] = cursor resp = helper.http_client.get(f"v1/datasets/{dataset_id}/experiments", params=params) resp.raise_for_status() page_data = resp.json() all_paginated_ids.extend([cast(str, exp["id"]) for exp in page_data["data"]]) # Verify page size (should be 2 except possibly the last page) assert len(page_data["data"]) <= 2 cursor = page_data.get("next_cursor") if not cursor: break # Safety check to prevent infinite loop assert page_count <= 10, "Pagination took too many pages" # Verify we got all experiments assert len(all_paginated_ids) == 6 assert set(all_paginated_ids) == set(experiment_ids) # Verify automatic pagination with list() gets all experiments all_experiments = await _await_or_return(client.experiments.list(dataset_id=dataset_id)) assert len(all_experiments) == 6 assert set(exp["id"] for exp in all_experiments) == set(experiment_ids) @pytest.mark.parametrize("is_async", [True, False]) async def test_delete_experiment( self, is_async: bool, _app: _AppInfo, _setup_experiment_test: _SetupExperimentTest, ) -> None: """Test deleting an experiment.""" client, helper = _setup_experiment_test(is_async) # Create a dataset, experiment, and run dataset_id, examples = helper.create_dataset( inputs=[{"q": "test"}], outputs=[{"a": "answer"}], ) exp = helper.create_experiment(dataset_id, repetitions=1) helper.create_runs(exp["id"], [(examples[0]["id"], 1, "response", None)]) # Delete the experiment await _await_or_return(client.experiments.delete(experiment_id=exp["id"])) # Verify experiment no longer exists with pytest.raises(ValueError, match="Experiment not found"): await _await_or_return(client.experiments.get(experiment_id=exp["id"])) # Verify it's not in the list experiments = await _await_or_return(client.experiments.list(dataset_id=dataset_id)) experiment_ids = [exp["id"] for exp in experiments] assert exp["id"] not in experiment_ids @pytest.mark.parametrize("is_async", [True, False]) async def test_delete_experiment_not_found( self, is_async: bool, _app: _AppInfo, _setup_experiment_test: _SetupExperimentTest, ) -> None: """Test that deleting a non-existent experiment raises ValueError.""" client, _ = _setup_experiment_test(is_async) fake_id = str(GlobalID("Experiment", "999999")) with pytest.raises(ValueError, match="Experiment not found"): await _await_or_return(client.experiments.delete(experiment_id=fake_id)) async def test_experiment_run_upsert_protection( self, _app: _AppInfo, _setup_experiment_test: _SetupExperimentTest, ) -> None: """ Test that experiment runs with successful results cannot be overwritten. Verifies: 1. Successful runs (error=None) return 409 Conflict when update is attempted 2. Failed runs (error not None) can be updated with new errors 3. Failed runs can be updated to successful runs 4. Once updated to successful, runs become protected from further updates """ _, helper = _setup_experiment_test(False) # Create dataset and experiment dataset_id, examples = helper.create_dataset( inputs=[{"q": "test1"}, {"q": "test2"}], outputs=[{"a": "answer1"}, {"a": "answer2"}], ) exp = helper.create_experiment(dataset_id, repetitions=1) # Test 1: Create a successful run (error=None) successful_run = helper.create_runs( exp["id"], [(examples[0]["id"], 1, "original_output", None)] )[0] assert successful_run["id"] is not None # Test 2: Attempt to update the successful run - should return 409 response = helper.http_client.post( f"v1/experiments/{exp['id']}/runs", json={ "dataset_example_id": examples[0]["id"], "repetition_number": 1, "output": "updated_output", "error": None, "start_time": helper.now, "end_time": helper.now, }, ) assert response.status_code == 409 assert "already exists with a successful result" in response.text # Test 3: Create a failed run (error not None) failed_run = helper.create_runs( exp["id"], [(examples[1]["id"], 1, "failed_output", "Some error")] )[0] assert failed_run["id"] is not None # Verify the failed run was created with the error runs = helper.get_runs(exp["id"]) example1_runs = [r for r in runs if r["dataset_example_id"] == examples[1]["id"]] assert len(example1_runs) == 1 assert example1_runs[0]["output"] == "failed_output" assert example1_runs[0].get("error") == "Some error" # Test 4: Update the failed run with a new error - should succeed updated_run = helper.create_runs( exp["id"], [(examples[1]["id"], 1, "retried_output", "New error message")] )[0] assert updated_run["id"] is not None # Verify the error was updated runs = helper.get_runs(exp["id"]) example1_runs = [r for r in runs if r["dataset_example_id"] == examples[1]["id"]] assert len(example1_runs) == 1 assert example1_runs[0]["output"] == "retried_output" assert example1_runs[0].get("error") == "New error message" # Test 5: Update the failed run to successful - should succeed successful_retry = helper.create_runs( exp["id"], [(examples[1]["id"], 1, "final_output", None)] )[0] assert successful_retry["id"] is not None # Verify the run is now successful (error is None) runs = helper.get_runs(exp["id"]) example1_runs = [r for r in runs if r["dataset_example_id"] == examples[1]["id"]] assert len(example1_runs) == 1 assert example1_runs[0]["output"] == "final_output" assert example1_runs[0].get("error") is None # Test 6: Now the previously-failed run is successful, it should be protected response = helper.http_client.post( f"v1/experiments/{exp['id']}/runs", json={ "dataset_example_id": examples[1]["id"], "repetition_number": 1, "output": "another_attempt", "error": None, "start_time": helper.now, "end_time": helper.now, }, ) assert response.status_code == 409 @pytest.mark.parametrize("is_async", [True, False]) async def test_create_experiment( self, is_async: bool, _app: _AppInfo, _setup_experiment_test: _SetupExperimentTest, ) -> None: """Test creating an experiment without running it.""" api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient from .._helpers import _gql Client = AsyncClient if is_async else SyncClient client = Client(base_url=_app.base_url, api_key=api_key) unique_name = f"test_create_{token_hex(4)}" # Create dataset with 4 examples dataset = await _await_or_return( client.datasets.create_dataset( name=unique_name, inputs=[{"q": f"test{i}"} for i in range(4)], outputs=[{"a": f"answer{i}"} for i in range(4)], ) ) example_ids = [example["id"] for example in dataset.examples] # Create a split with first 2 examples split_mutation = """ mutation($input: CreateDatasetSplitWithExamplesInput!) { createDatasetSplitWithExamples(input: $input) { datasetSplit { id name } } } """ split_name = f"{unique_name}_train" _gql( _app, _app.admin_secret, query=split_mutation, variables={ "input": { "name": split_name, "color": "#FF0000", "exampleIds": [example_ids[0], example_ids[1]], } }, ) # Create experiment with metadata and splits experiment = await _await_or_return( client.experiments.create( dataset_id=dataset.id, experiment_name=f"Test Create {token_hex(4)}", experiment_description="Test description", experiment_metadata={"model": "test"}, splits=[split_name], repetitions=3, ) ) assert experiment["id"] is not None assert experiment["dataset_id"] == dataset.id assert experiment["missing_run_count"] == 6 # 2 examples * 3 repetitions assert experiment["example_count"] == 2 # Only 2 examples in the split assert experiment["metadata"] == {"model": "test"} class TestResumeOperations: """ Comprehensive test suite for resume operations (resume_experiment and resume_evaluation). resume_experiment: Re-runs incomplete or failed experiment task runs. resume_evaluation: Re-runs incomplete or failed experiment evaluations. Both operations support: - Recovering from transient failures - Completing partially completed experiments - Early exit optimizations Tests cover: **resume_experiment:** - Failed task runs recovery - Missing runs (never created) - Mixed failed and missing scenarios - Repetition-level granularity - Error handling and validation **resume_evaluation:** - Missing evaluations (never run) - Failed evaluations (ran but had errors) - Successful evaluations (should not be re-run) - Early exit when all complete """ @pytest.mark.parametrize("is_async", [True, False]) async def test_resume_incomplete_runs_comprehensive( self, is_async: bool, _app: _AppInfo, _setup_experiment_test: _SetupExperimentTest ) -> None: """ Comprehensive test for resuming incomplete runs. Tests all incomplete run scenarios in one test: 1. Failed runs are resumed (examples 0-1) 2. Successful runs are preserved (example 2) 3. Missing runs are created (examples 3-4) 4. Mixed failed+missing work together (examples 0-4) 5. Repetition-level granularity (example 5: only failed rep 2) This consolidates test_basic_resume, test_missing_runs, test_mixed_missing_and_failed, and test_repetition_level_granularity. """ client, helper = _setup_experiment_test(is_async) # Create first dataset for comprehensive mixed scenarios (examples 0-4) dataset_id_1, examples_1 = helper.create_dataset( inputs=[{"idx": i} for i in range(5)], outputs=[{"result": i} for i in range(5)], ) # Create second dataset for repetition-level granularity test (example 5) dataset_id_2, examples_2 = helper.create_dataset( inputs=[{"idx": 5}], outputs=[{"result": 5}], ) # Experiment 1: Examples 0-4 with 2 repetitions each exp = helper.create_experiment(dataset_id_1, repetitions=2) helper.create_runs( exp["id"], [ # Example 0: failed runs (both reps) (examples_1[0]["id"], 1, None, "Failed 0"), (examples_1[0]["id"], 2, None, "Failed 0"), # Example 1: failed runs (both reps) (examples_1[1]["id"], 1, None, "Failed 1"), (examples_1[1]["id"], 2, None, "Failed 1"), # Example 2: successful runs (will be preserved) (examples_1[2]["id"], 1, "Success 2", None), (examples_1[2]["id"], 2, "Success 2", None), # Examples 3-4: missing runs (not created, but should exist per repetitions=2) ], ) # Experiment 2: Example 5 with 3 repetitions (repetition-level test) exp_reps = helper.create_experiment(dataset_id_2, repetitions=3) helper.create_runs( exp_reps["id"], [ (examples_2[0]["id"], 1, "Success rep 1", None), (examples_2[0]["id"], 2, None, "Failed rep 2"), (examples_2[0]["id"], 3, "Success rep 3", None), ], ) # Track execution processed: set[int] = set() call_count = [0] def tracking_task(input: dict[str, Any]) -> str: idx = cast(int, input["idx"]) processed.add(idx) call_count[0] += 1 return f"Resumed {idx}" # Resume experiment 1 (examples 0-4) await _await_or_return( client.experiments.resume_experiment( experiment_id=exp["id"], task=tracking_task, print_summary=False, ) ) # Verify experiment has correct counts after resuming # 5 examples × 2 repetitions = 10 runs total resumed_exp = await _await_or_return(client.experiments.get(experiment_id=exp["id"])) assert resumed_exp["successful_run_count"] == 10 assert {0, 1, 3, 4} <= processed, "Should process failed and missing examples" # Verify outputs helper.assert_output_by_example( exp["id"], expected={ 0: "Resumed 0", # Failed → resumed (both reps) 1: "Resumed 1", # Failed → resumed (both reps) 2: "Success 2", # Successful → preserved (both reps) 3: "Resumed 3", # Missing → created (both reps) 4: "Resumed 4", # Missing → created (both reps) }, examples=examples_1, ) # Resume experiment 2 (example 5 with repetitions) call_count[0] = 0 # Reset counter await _await_or_return( client.experiments.resume_experiment( experiment_id=exp_reps["id"], task=tracking_task, print_summary=False, ) ) # Verify only 1 repetition was re-run assert call_count[0] == 1, "Should only re-run 1 failed repetition" assert 5 in processed # Verify repetition-level precision helper.assert_output_by_example( exp_reps["id"], expected={ 0: ["Success rep 1", "Resumed 5", "Success rep 3"], }, examples=examples_2, ) @pytest.mark.parametrize("is_async", [True, False]) async def test_early_exit_when_complete( self, is_async: bool, _app: _AppInfo, _setup_experiment_test: _SetupExperimentTest ) -> None: """ Test early exit optimization when all runs/evaluations are complete. Tests: 1. resume_experiment early exit when all task runs are successful 2. resume_evaluation early exit when all evaluations are complete Verifies that both operations detect when there's nothing to do and skip unnecessary function calls (optimization). """ client, helper = _setup_experiment_test(is_async) # Scenario 1: resume_experiment early exit dataset_id, examples = helper.create_dataset( inputs=[{"q": "Q1"}], outputs=[{"a": "A1"}], ) exp = helper.create_experiment(dataset_id, repetitions=1) helper.create_runs(exp["id"], [(examples[0]["id"], 1, "Complete", None)]) task_call_count = [0] def noop_task(input: dict[str, Any]) -> str: task_call_count[0] += 1 return "Result" await _await_or_return( client.experiments.resume_experiment( experiment_id=exp["id"], task=noop_task, print_summary=False, ) ) assert task_call_count[0] == 0, "Task should not be called when no incomplete runs" # Verify experiment state after early exit resumed_exp = await _await_or_return(client.experiments.get(experiment_id=exp["id"])) assert resumed_exp["id"] == exp["id"] assert resumed_exp["example_count"] == 1 assert resumed_exp["successful_run_count"] == 1 # Scenario 2: resume_evaluation early exit # Add successful evaluation for "relevance" helper.create_evaluations(exp["id"], ["relevance"], []) eval_call_count = [0] def accuracy_evaluator(output: Any) -> float: eval_call_count[0] += 1 return 1.0 # Try to resume with same evaluator (should early exit, no re-run) await _await_or_return( client.experiments.resume_evaluation( experiment_id=exp["id"], evaluators={"relevance": accuracy_evaluator}, # pyright: ignore[reportUnknownLambdaType,reportUnknownArgumentType] print_summary=False, ) ) assert eval_call_count[0] == 0, "Evaluator should not be called when evaluation is complete" # Verify the existing evaluation was not modified data = helper.get_experiment_annotations(exp["id"]) helper.assert_annotations( runs_data=data["runs"]["edges"], expected_count=1, expected_by_run={"relevance": 1.0}, ) @pytest.mark.parametrize("is_async", [True, False]) async def test_error_scenarios( self, is_async: bool, _app: _AppInfo, _setup_experiment_test: _SetupExperimentTest ) -> None: """ Comprehensive error handling test covering multiple error scenarios. Tests: 1. Resume task continues to fail - new error is recorded 2. Resume evaluation continues to fail - new error is recorded 3. Invalid experiment ID (non-existent ID) 4. Empty evaluators dict for resume_evaluation Verifies that all error conditions are handled gracefully with appropriate error messages and the system remains stable. """ client, helper = _setup_experiment_test(is_async) # Scenario 1: Resume task continues to fail dataset_id, examples = helper.create_dataset( inputs=[{"x": i} for i in range(3)], outputs=[{"y": i} for i in range(3)], ) exp = helper.create_experiment(dataset_id, repetitions=1) helper.create_runs( exp["id"], [(examples[i]["id"], 1, None, "Original error") for i in range(3)] ) def failing_task(input: dict[str, Any]) -> str: raise ValueError("Task still fails on resume") await _await_or_return( client.experiments.resume_experiment( experiment_id=exp["id"], task=failing_task, print_summary=False, ) ) # Verify NEW errors are recorded in the runs (not the original errors) runs = helper.get_runs(exp["id"]) assert len([r for r in runs if r.get("error")]) == 3, "All runs should still have errors" for run in runs: error_msg = run.get("error", "") assert "Task still fails on resume" in error_msg, ( f"Expected new error message, got: {error_msg}" ) assert "Original error" not in error_msg, "Should have new error, not original" # Scenario 2: Resume evaluation continues to fail exp2 = helper.create_experiment(dataset_id, repetitions=1) helper.create_runs( exp2["id"], [(examples[i]["id"], 1, f"output_{i}", None) for i in range(3)] ) # Add failed evaluations helper.create_evaluations(exp2["id"], [], ["quality"]) def failing_evaluator(output: Any) -> float: raise ValueError("Evaluator still fails on resume") await _await_or_return( client.experiments.resume_evaluation( experiment_id=exp2["id"], evaluators={"quality": failing_evaluator}, # pyright: ignore[reportUnknownLambdaType,reportUnknownArgumentType] print_summary=False, ) ) # Verify NEW evaluation errors are recorded data = helper.get_experiment_annotations(exp2["id"]) annotations = data["runs"]["edges"] assert len(annotations) == 3, "Should have 3 runs" for run_edge in annotations: run_annotations = run_edge["run"]["annotations"]["edges"] quality_annotations = [ a["annotation"] for a in run_annotations if a["annotation"]["name"] == "quality" ] assert len(quality_annotations) == 1, "Should have one quality annotation per run" annotation = quality_annotations[0] assert annotation["error"] is not None, "Annotation should have error" assert "Evaluator still fails on resume" in annotation["error"], ( f"Expected new evaluation error message, got: {annotation['error']}" ) # Scenario 3: Invalid experiment ID with pytest.raises(ValueError, match="Experiment not found"): await _await_or_return( client.experiments.resume_experiment( experiment_id=str(GlobalID("Experiment", "999999")), task=lambda input: "x", # pyright: ignore[reportUnknownLambdaType,reportUnknownArgumentType] print_summary=False, ) ) # Scenario 4: Empty evaluators for resume_evaluation with pytest.raises(ValueError, match="Must specify at least one evaluator"): await _await_or_return( client.experiments.resume_evaluation( experiment_id=exp["id"], evaluators={}, # Empty dict - should fail validation print_summary=False, ) ) @pytest.mark.parametrize("is_async", [True, False]) async def test_resume_experiment_with_evaluators( self, is_async: bool, _app: _AppInfo, _setup_experiment_test: _SetupExperimentTest ) -> None: """ Test resume_experiment with evaluators integration. Validates that: - Task runs are completed first - Evaluators are automatically run on completed runs - The integration between resume_experiment and resume_evaluation works correctly """ client, helper = _setup_experiment_test(is_async) dataset_id, examples = helper.create_dataset( inputs=[{"q": f"Q{i}"} for i in range(2)], outputs=[{"a": f"A{i}"} for i in range(2)], ) exp = helper.create_experiment(dataset_id, repetitions=1) # Create only failed runs helper.create_runs( exp["id"], [ (examples[0]["id"], 1, None, "Failed 1"), (examples[1]["id"], 1, None, "Failed 2"), ], ) # Resume with both task and evaluators await _await_or_return( client.experiments.resume_experiment( experiment_id=exp["id"], task=lambda input: f"Resumed {cast(str, input['q'])}", # pyright: ignore[reportUnknownLambdaType,reportUnknownArgumentType] evaluators={"quality": lambda output: 0.9}, # pyright: ignore[reportUnknownLambdaType,reportUnknownArgumentType] print_summary=False, ) ) # Verify task runs completed resumed_exp = await _await_or_return(client.experiments.get(experiment_id=exp["id"])) assert resumed_exp["id"] == exp["id"] assert resumed_exp["successful_run_count"] == 2 # Verify task outputs were persisted helper.assert_output_by_example( exp["id"], expected={ 0: "Resumed Q0", 1: "Resumed Q1", }, examples=examples, ) # Verify evaluations were run data = helper.get_experiment_annotations(exp["id"]) helper.assert_annotations( runs_data=data["runs"]["edges"], expected_count=2, expected_by_run={"quality": 0.9}, ) @pytest.mark.parametrize("is_async", [True, False]) async def test_resume_evaluation_comprehensive( self, is_async: bool, _app: _AppInfo, _setup_experiment_test: _SetupExperimentTest ) -> None: """ Comprehensive test for resume_evaluation covering all scenarios. Tests: 1. Successful evaluations are NOT re-run (accuracy, relevance preserved) 2. Failed evaluations ARE re-run (quality re-executed) 3. Missing evaluations ARE run (toxicity added) 4. Selective retry: can resume only specific evaluators 5. Pagination: large experiments with > 50 runs are handled correctly 6. Call count tracking verifies correct execution This provides complete coverage of resume_evaluation logic. """ client, helper = _setup_experiment_test(is_async) # Create experiment with 3 runs dataset_id, examples = helper.create_dataset( inputs=[{"input": f"input_{i}"} for i in range(3)], outputs=[{"output": f"output_{i}"} for i in range(3)], ) exp = helper.create_experiment(dataset_id, repetitions=1) helper.create_runs( exp["id"], [(examples[i]["id"], 1, f"result_{i}", None) for i in range(3)], ) # Add evaluations: accuracy and relevance successful, quality failed helper.create_evaluations(exp["id"], ["accuracy", "relevance"], ["quality"]) # Part 1: Selective retry - resume only the failed "quality" evaluator # Other successful evaluators (accuracy, relevance) should be preserved quality_call_count = [0] def quality_evaluator(output: Any) -> float: quality_call_count[0] += 1 return 0.95 await _await_or_return( client.experiments.resume_evaluation( experiment_id=exp["id"], evaluators={"quality": quality_evaluator}, # pyright: ignore[reportUnknownLambdaType,reportUnknownArgumentType] print_summary=False, ) ) # Verify quality evaluator was called exactly 3 times (once per run) assert quality_call_count[0] == 3, "Quality evaluator should run for all 3 runs" # Verify all three evaluators are present with correct scores data = helper.get_experiment_annotations(exp["id"]) helper.assert_annotations( runs_data=data["runs"]["edges"], expected_count=3, expected_by_run={ "accuracy": 1.0, # Preserved from original successful run "quality": 0.95, # Updated from failed to successful "relevance": 1.0, # Preserved from original successful run }, ) # Part 2: Add missing evaluator - resume with new "toxicity" evaluator toxicity_call_count = [0] def toxicity_evaluator(output: Any) -> float: toxicity_call_count[0] += 1 return 0.1 await _await_or_return( client.experiments.resume_evaluation( experiment_id=exp["id"], evaluators={"toxicity": toxicity_evaluator}, # pyright: ignore[reportUnknownLambdaType,reportUnknownArgumentType] print_summary=False, ) ) # Verify toxicity evaluator was called 3 times (once per run) assert toxicity_call_count[0] == 3, "Toxicity evaluator should run for all 3 runs" # Verify all four evaluators are now present data = helper.get_experiment_annotations(exp["id"]) helper.assert_annotations( runs_data=data["runs"]["edges"], expected_count=3, expected_by_run={ "accuracy": 1.0, # Still preserved "quality": 0.95, # Still updated "relevance": 1.0, # Still preserved "toxicity": 0.1, # Newly added }, ) # Part 3: Pagination - test with large experiment (>50 runs) num_examples = 75 dataset_id_large, examples_large = helper.create_dataset( inputs=[{"x": i} for i in range(num_examples)], outputs=[{"y": i * 2} for i in range(num_examples)], ) exp_large = helper.create_experiment(dataset_id_large, repetitions=1) helper.create_runs( exp_large["id"], [(examples_large[i]["id"], 1, f"output_{i}", None) for i in range(num_examples)], ) # All runs are successful but missing "pagination_test" evaluation evaluated_indices: set[int] = set() def pagination_evaluator(output: Any) -> float: # Extract index from output to track which runs were evaluated output_str = str(output) # pyright: ignore[reportUnknownArgumentType] idx = int(output_str.split("_")[1]) evaluated_indices.add(idx) return 0.8 await _await_or_return( client.experiments.resume_evaluation( experiment_id=exp_large["id"], evaluators={"pagination_test": pagination_evaluator}, # pyright: ignore[reportUnknownLambdaType,reportUnknownArgumentType] print_summary=False, ) ) # Verify all runs were evaluated (no skips due to pagination) assert evaluated_indices == set(range(num_examples)), ( f"All {num_examples} runs should be evaluated across pagination boundaries, " f"but only {len(evaluated_indices)} were evaluated" ) # Verify evaluations were persisted to the database data_large = helper.get_experiment_annotations(exp_large["id"]) helper.assert_annotations( runs_data=data_large["runs"]["edges"], expected_count=num_examples, expected_by_run={"pagination_test": 0.8}, ) class TestEvaluateExperiment: """Test the run_experiment -> evaluate_experiment pattern from legacy implementation.""" @pytest.mark.parametrize("is_async", [True, False]) async def test_run_experiment_then_evaluate_experiment_pattern( self, is_async: bool, _app: _AppInfo, ) -> None: """Test running experiment without evaluators, then adding evaluations separately.""" api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient unique_name = f"test_evaluate_pattern_{token_hex(4)}" # Create test dataset dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[ {"question": "What is 2+2?"}, {"question": "What is the capital of France?"}, {"question": "Who wrote Python?"}, ], outputs=[ {"answer": "4"}, {"answer": "Paris"}, {"answer": "Guido van Rossum"}, ], metadata=[ {"category": "math"}, {"category": "geography"}, {"category": "programming"}, ], ) ) def simple_task(input: Dict[str, Any]) -> str: question = input.get("question", "") if "2+2" in question: return "The answer is 4" elif "capital" in question: return "The capital is Paris" elif "Python" in question: return "Guido van Rossum created Python" else: return "I don't know" def accuracy_evaluator(output: str, expected: Dict[str, Any]) -> float: expected_answer = expected.get("answer", "") return 1.0 if expected_answer in output else 0.0 def length_evaluator(output: str) -> Dict[str, Any]: return {"score": len(output) / 20.0, "label": "length_score"} # Step 1: Run experiment WITHOUT evaluators (task execution only) initial_result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=dataset, task=simple_task, experiment_name=f"test_initial_{token_hex(4)}", print_summary=False, ) ) # Verify initial result has no evaluations but has task runs assert "experiment_id" in initial_result assert "dataset_id" in initial_result assert "task_runs" in initial_result assert "evaluation_runs" in initial_result assert initial_result["dataset_id"] == dataset.id assert len(initial_result["task_runs"]) == 3 assert len(initial_result["evaluation_runs"]) == 0 # No evaluations yet # Step 2: Add evaluations to the completed experiment # This will test the new evaluate_experiment method eval_result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.evaluate_experiment( experiment=initial_result, evaluators=[accuracy_evaluator, length_evaluator], print_summary=False, ) ) # Verify evaluation results assert "experiment_id" in eval_result assert "dataset_id" in eval_result assert "task_runs" in eval_result assert "evaluation_runs" in eval_result assert eval_result["experiment_id"] == initial_result["experiment_id"] assert eval_result["dataset_id"] == dataset.id assert len(eval_result["task_runs"]) == 3 # Same task runs as before assert len(eval_result["evaluation_runs"]) > 0 # Now we have evaluations # Verify that we have evaluations for each task run and each evaluator expected_eval_runs = len(eval_result["task_runs"]) * 2 # 2 evaluators assert len(eval_result["evaluation_runs"]) == expected_eval_runs @pytest.mark.parametrize("is_async", [True, False]) async def test_evaluation_consistency_when_implemented( self, is_async: bool, _app: _AppInfo, ) -> None: """Test that run_experiment with evaluators produces same results as separate evaluation.""" # Test is now enabled since evaluate_experiment is implemented api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient unique_name = f"test_consistency_{token_hex(4)}" dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[{"text": "Hello world"}, {"text": "Python is great"}], outputs=[{"expected": "greeting"}, {"expected": "programming"}], ) ) def simple_task(input: Dict[str, Any]) -> str: text = input.get("text", "") if "Hello" in text: return "greeting" elif "Python" in text: return "programming" else: return "unknown" def accuracy_evaluator(output: str, expected: Dict[str, Any]) -> float: return 1.0 if output == expected.get("expected") else 0.0 client = Client(base_url=_app.base_url, api_key=api_key) # Method 1: Run experiment with evaluators included result_with_evals = await _await_or_return( client.experiments.run_experiment( dataset=dataset, task=simple_task, evaluators=[accuracy_evaluator], experiment_name=f"test_with_evals_{token_hex(4)}", print_summary=False, ) ) # Method 2: Run experiment without evaluators, then evaluate separately result_without_evals = await _await_or_return( client.experiments.run_experiment( dataset=dataset, task=simple_task, experiment_name=f"test_without_evals_{token_hex(4)}", print_summary=False, ) ) eval_result = await _await_or_return( client.experiments.evaluate_experiment( experiment=result_without_evals, evaluators=[accuracy_evaluator], print_summary=False, ) ) # Both methods should produce equivalent results assert len(result_with_evals["evaluation_runs"]) == len(eval_result["evaluation_runs"]) assert len(result_with_evals["task_runs"]) == len(result_without_evals["task_runs"]) # Evaluation results should be equivalent for eval1, eval2 in zip( result_with_evals["evaluation_runs"], eval_result["evaluation_runs"] ): assert eval1.name == eval2.name assert eval1.result == eval2.result @pytest.mark.parametrize("is_async", [True, False]) async def test_get_experiment_and_evaluate( self, is_async: bool, _app: _AppInfo, ) -> None: api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient unique_name = f"test_get_experiment_{token_hex(4)}" dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[ {"question": "What is 2+2?"}, {"question": "What is the capital of France?"}, ], outputs=[ {"answer": "4"}, {"answer": "Paris"}, ], metadata=[ {"category": "math"}, {"category": "geography"}, ], ) ) def simple_task(input: Dict[str, Any]) -> str: question = input.get("question", "") if "2+2" in question: return "The answer is 4" elif "capital" in question: return "The capital is Paris" else: return "I don't know" def accuracy_evaluator(output: str, expected: Dict[str, Any]) -> float: expected_answer = expected.get("answer", "") return 1.0 if expected_answer in output else 0.0 def length_evaluator(output: str) -> Dict[str, Any]: return {"score": len(output) / 20.0, "label": "length_score"} client = Client(base_url=_app.base_url, api_key=api_key) initial_result = await _await_or_return( client.experiments.run_experiment( dataset=dataset, task=simple_task, evaluators=[accuracy_evaluator], # Start with one evaluator experiment_name=f"test_get_exp_{token_hex(4)}", print_summary=False, ) ) assert "experiment_id" in initial_result assert "dataset_id" in initial_result assert "task_runs" in initial_result assert "evaluation_runs" in initial_result assert len(initial_result["task_runs"]) == 2 assert len(initial_result["evaluation_runs"]) == 2 # Should have 2 evals (1 per task run) initial_accuracy_evals = [ eval_run for eval_run in initial_result["evaluation_runs"] if eval_run.name == "accuracy_evaluator" ] assert len(initial_accuracy_evals) == 2 retrieved_experiment = await _await_or_return( client.experiments.get_experiment(experiment_id=initial_result["experiment_id"]) ) assert retrieved_experiment["experiment_id"] == initial_result["experiment_id"] assert retrieved_experiment["dataset_id"] == initial_result["dataset_id"] assert len(retrieved_experiment["task_runs"]) == len(initial_result["task_runs"]) assert len(retrieved_experiment["evaluation_runs"]) == len( initial_result["evaluation_runs"] ) assert len(retrieved_experiment["evaluation_runs"]) == 2 retrieved_accuracy_evals = [ eval_run for eval_run in retrieved_experiment["evaluation_runs"] if eval_run.name == "accuracy_evaluator" ] assert len(retrieved_accuracy_evals) == 2 task_outputs = [run["output"] for run in retrieved_experiment["task_runs"]] assert "The answer is 4" in task_outputs assert "The capital is Paris" in task_outputs final_result = await _await_or_return( client.experiments.evaluate_experiment( experiment=retrieved_experiment, evaluators=[length_evaluator], # Add a second evaluator print_summary=False, ) ) assert final_result["experiment_id"] == initial_result["experiment_id"] assert final_result["dataset_id"] == initial_result["dataset_id"] assert len(final_result["task_runs"]) == 2 # Same task runs assert len(final_result["evaluation_runs"]) == 4 final_accuracy_evals = [ eval_run for eval_run in final_result["evaluation_runs"] if eval_run.name == "accuracy_evaluator" ] assert len(final_accuracy_evals) == 2 final_length_evals = [ eval_run for eval_run in final_result["evaluation_runs"] if eval_run.name == "length_evaluator" ] assert len(final_length_evals) == 2 # Verify evaluation results for eval_run in final_accuracy_evals: assert eval_run.result is not None assert isinstance(eval_run.result, dict) assert eval_run.result.get("score") == 1.0 for eval_run in final_length_evals: assert eval_run.result is not None assert isinstance(eval_run.result, dict) assert eval_run.result.get("score") is not None assert eval_run.result.get("label") == "length_score" @pytest.mark.parametrize("is_async", [True, False]) async def test_dry_run_with_evaluate_experiment( self, is_async: bool, _app: _AppInfo, ) -> None: api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient Client = AsyncClient if is_async else SyncClient unique_name = f"test_dry_run_eval_{token_hex(4)}" dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[ {"text": "Hello world"}, {"text": "Python is great"}, ], outputs=[ {"expected": "greeting"}, {"expected": "programming"}, ], ) ) def simple_task(input: Dict[str, Any]) -> str: text = input.get("text", "") if "Hello" in text: return "greeting" elif "Python" in text: return "programming" else: return "unknown" def accuracy_evaluator(output: str, expected: Dict[str, Any]) -> float: return 1.0 if output == expected.get("expected") else 0.0 client = Client(base_url=_app.base_url, api_key=api_key) dry_run_result = await _await_or_return( client.experiments.run_experiment( dataset=dataset, task=simple_task, experiment_name=f"test_dry_run_{token_hex(4)}", dry_run=True, print_summary=False, ) ) assert dry_run_result["experiment_id"] == "DRY_RUN" assert len(dry_run_result["task_runs"]) == 1 assert len(dry_run_result["evaluation_runs"]) == 0 eval_result = await _await_or_return( client.experiments.evaluate_experiment( experiment=dry_run_result, evaluators=[accuracy_evaluator], dry_run=True, print_summary=False, ) ) assert eval_result["experiment_id"] == "DRY_RUN" assert len(eval_result["task_runs"]) == 1 assert len(eval_result["evaluation_runs"]) == 1 @pytest.mark.parametrize("is_async", [True, False]) async def test_experiment_with_dataset_splits( self, is_async: bool, _app: _AppInfo, ) -> None: """Test that experiments correctly record split_ids and populate the experiments_dataset_splits junction table.""" api_key = _app.admin_secret from phoenix.client import AsyncClient from phoenix.client import Client as SyncClient from .._helpers import _gql Client = AsyncClient if is_async else SyncClient unique_name = f"test_exp_splits_{token_hex(4)}" # Create dataset with examples dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.create_dataset( name=unique_name, inputs=[ {"question": "What is 2+2?"}, {"question": "What is the capital of France?"}, {"question": "Who wrote Python?"}, {"question": "What is recursion?"}, ], outputs=[ {"answer": "4"}, {"answer": "Paris"}, {"answer": "Guido van Rossum"}, {"answer": "A function calling itself"}, ], metadata=[ {"category": "math"}, {"category": "geography"}, {"category": "programming"}, {"category": "computer_science"}, ], ) ) assert len(dataset) == 4 example_ids = [example["id"] for example in dataset.examples] # Create splits using GraphQL # Split 1: Training set (first 2 examples) split_mutation = """ mutation($input: CreateDatasetSplitWithExamplesInput!) { createDatasetSplitWithExamples(input: $input) { datasetSplit { id name } } } """ train_split_result, _ = _gql( _app, _app.admin_secret, query=split_mutation, variables={ "input": { "name": f"{unique_name}_train", "color": "#FF0000", "exampleIds": [example_ids[0], example_ids[1]], } }, ) train_split_id = train_split_result["data"]["createDatasetSplitWithExamples"][ "datasetSplit" ]["id"] train_split_name = train_split_result["data"]["createDatasetSplitWithExamples"][ "datasetSplit" ]["name"] # Split 2: Test set (last 2 examples) test_split_result, _ = _gql( _app, _app.admin_secret, query=split_mutation, variables={ "input": { "name": f"{unique_name}_test", "color": "#00FF00", "exampleIds": [example_ids[2], example_ids[3]], } }, ) test_split_id = test_split_result["data"]["createDatasetSplitWithExamples"]["datasetSplit"][ "id" ] test_split_name = test_split_result["data"]["createDatasetSplitWithExamples"][ "datasetSplit" ]["name"] # First, verify that getting dataset with no splits filter returns ALL examples full_dataset_no_filter = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.get_dataset(dataset=dataset.id) ) assert len(full_dataset_no_filter) == 4, ( f"Expected all 4 examples with no filter, got {len(full_dataset_no_filter)}" ) assert full_dataset_no_filter._filtered_split_names == [], ( f"Expected empty split_names with no filter, got {full_dataset_no_filter._filtered_split_names}" ) # Verify all original example IDs are present full_dataset_example_ids = {example["id"] for example in full_dataset_no_filter.examples} assert full_dataset_example_ids == set(example_ids), ( "Full dataset should contain all original example IDs" ) # Define GraphQL query for verifying experiment splits (used multiple times below) verify_splits_query = """ query($experimentId: ID!) { node(id: $experimentId) { ... on Experiment { id datasetSplits { edges { node { id name } } } } } } """ # Define a simple task for the experiment (used in multiple tests below) def simple_task(input: Dict[str, Any]) -> str: question = input.get("question", "") if "2+2" in question: return "The answer is 4" elif "capital" in question: return "The capital is Paris" elif "Python" in question: return "Created by Guido van Rossum" elif "recursion" in question: return "When a function calls itself" else: return "I don't know" # Run an experiment on the full dataset (no split filter) to verify it processes all examples full_dataset_experiment = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=full_dataset_no_filter, task=simple_task, experiment_name=f"test_no_split_experiment_{token_hex(4)}", experiment_description="Test experiment with no split filter", print_summary=False, ) ) assert len(full_dataset_experiment["task_runs"]) == 4, ( f"Expected 4 task runs on full dataset, got {len(full_dataset_experiment['task_runs'])}" ) # Verify that experiment with no split filter has empty dataset_splits association no_split_exp_id = full_dataset_experiment["experiment_id"] no_split_verification, _ = _gql( _app, _app.admin_secret, query=verify_splits_query, variables={"experimentId": no_split_exp_id}, ) no_split_exp_node = no_split_verification["data"]["node"] no_split_edges = no_split_exp_node["datasetSplits"]["edges"] assert len(no_split_edges) == 0, ( f"Expected 0 splits for experiment with no filter, got {len(no_split_edges)}" ) # Get dataset filtered by train split only train_dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.get_dataset( dataset=dataset.id, splits=[train_split_name], ) ) assert len(train_dataset) == 2, ( f"Expected 2 examples in train split, got {len(train_dataset)}" ) assert train_split_name in train_dataset._filtered_split_names, ( "Train split name should be in dataset._filtered_split_names" ) # Run experiment on the filtered train dataset result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=train_dataset, task=simple_task, experiment_name=f"test_split_experiment_{token_hex(4)}", experiment_description="Test experiment with dataset splits", print_summary=False, ) ) # Verify experiment was created and ran on correct number of examples assert "experiment_id" in result assert result["experiment_id"] != "DRY_RUN" assert "dataset_id" in result assert result["dataset_id"] == dataset.id assert len(result["task_runs"]) == 2, ( f"Expected 2 task runs (train split), got {len(result['task_runs'])}" ) experiment_id = result["experiment_id"] # Query the database to verify the experiments_dataset_splits junction table is populated splits_verification, _ = _gql( _app, _app.admin_secret, query=verify_splits_query, variables={"experimentId": experiment_id}, ) experiment_node = splits_verification["data"]["node"] assert experiment_node is not None, "Experiment should exist" assert "datasetSplits" in experiment_node, "Experiment should have datasetSplits field" dataset_splits_edges = experiment_node["datasetSplits"]["edges"] assert len(dataset_splits_edges) == 1, ( f"Expected 1 split associated with experiment, got {len(dataset_splits_edges)}" ) associated_split = dataset_splits_edges[0]["node"] assert associated_split["id"] == train_split_id, ( f"Expected train split {train_split_id}, got {associated_split['id']}" ) assert associated_split["name"] == train_split_name, ( f"Expected train split name {train_split_name}, got {associated_split['name']}" ) # Test retrieving the experiment and verifying it contains split information retrieved_experiment = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.get_experiment( experiment_id=experiment_id ) ) assert retrieved_experiment["experiment_id"] == experiment_id assert retrieved_experiment["dataset_id"] == dataset.id assert len(retrieved_experiment["task_runs"]) == 2 # Now test running an experiment on multiple splits both_splits_dataset = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).datasets.get_dataset( dataset=dataset.id, splits=[train_split_name, test_split_name], ) ) assert len(both_splits_dataset) == 4, ( f"Expected 4 examples with both splits, got {len(both_splits_dataset)}" ) assert train_split_name in both_splits_dataset._filtered_split_names assert test_split_name in both_splits_dataset._filtered_split_names # Run experiment on dataset with both splits multi_split_result = await _await_or_return( Client(base_url=_app.base_url, api_key=api_key).experiments.run_experiment( dataset=both_splits_dataset, task=simple_task, experiment_name=f"test_multi_split_experiment_{token_hex(4)}", experiment_description="Test experiment with multiple dataset splits", print_summary=False, ) ) assert len(multi_split_result["task_runs"]) == 4, ( f"Expected 4 task runs (both splits), got {len(multi_split_result['task_runs'])}" ) multi_split_exp_id = multi_split_result["experiment_id"] # Verify both splits are associated with the experiment multi_splits_verification, _ = _gql( _app, _app.admin_secret, query=verify_splits_query, variables={"experimentId": multi_split_exp_id}, ) multi_exp_node = multi_splits_verification["data"]["node"] multi_splits_edges = multi_exp_node["datasetSplits"]["edges"] assert len(multi_splits_edges) == 2, ( f"Expected 2 splits associated with experiment, got {len(multi_splits_edges)}" ) # Verify both split IDs are present associated_split_ids = {edge["node"]["id"] for edge in multi_splits_edges} assert train_split_id in associated_split_ids, ( f"Train split {train_split_id} should be in associated splits" ) assert test_split_id in associated_split_ids, ( f"Test split {test_split_id} should be in associated splits" )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'

If you have feedback or need assistance with the MCP directory API, please join our Discord server