Skip to main content
Glama

@arizeai/phoenix-mcp

Official
by Arize-ai
test_experiments.py73.4 kB
import json from datetime import datetime, timezone from io import StringIO from secrets import token_hex from typing import Any, Optional import httpx import pandas as pd import pytest from httpx import HTTPStatusError from sqlalchemy import select from strawberry.relay import GlobalID from phoenix.db import models from phoenix.server.api.types.node import from_global_id_with_expected_type from phoenix.server.types import DbSessionFactory from tests.unit._helpers import verify_experiment_examples_junction_table from tests.unit.server.api.conftest import ExperimentsWithIncompleteRuns async def test_experiments_api( httpx_client: httpx.AsyncClient, simple_dataset: Any, db: DbSessionFactory, ) -> None: """ A simple test of the expected flow for the experiments API flow """ dataset_gid = GlobalID("Dataset", "0") # first, create an experiment associated with a dataset created_experiment = ( await httpx_client.post( f"v1/datasets/{dataset_gid}/experiments", json={"version_id": None, "repetitions": 1}, ) ).json()["data"] experiment_gid = created_experiment["id"] version_gid = created_experiment["dataset_version_id"] assert created_experiment["repetitions"] == 1 dataset_examples = ( await httpx_client.get( f"v1/datasets/{dataset_gid}/examples", params={"version_id": str(version_gid)}, ) ).json()["data"]["examples"] # Verify that the experiment examples snapshot was created in the junction table async with db() as session: await verify_experiment_examples_junction_table(session, experiment_gid) # experiments can be read using the GET /experiments route experiment = (await httpx_client.get(f"v1/experiments/{experiment_gid}")).json()["data"] assert experiment assert created_experiment["repetitions"] == 1 # get experiment JSON before any runs - should return 404 response = await httpx_client.get(f"v1/experiments/{experiment_gid}/json") assert response.status_code == 404 assert "has no runs" in response.text # create experiment runs for each dataset example run_payload = { "dataset_example_id": str(dataset_examples[0]["id"]), "trace_id": "placeholder-id", "output": "some LLM application output", "repetition_number": 1, "start_time": datetime.now(timezone.utc).isoformat(), "end_time": datetime.now(timezone.utc).isoformat(), "error": "an error message, if applicable", } run_payload["id"] = ( await httpx_client.post( f"v1/experiments/{experiment_gid}/runs", json=run_payload, ) ).json()["data"]["id"] # get experiment JSON after runs but before evaluations response = await httpx_client.get(f"v1/experiments/{experiment_gid}/json") assert response.status_code == 200 runs = json.loads(response.text) assert len(runs) == 1 run = runs[0] assert isinstance(run.pop("example_id"), str) assert run.pop("repetition_number") == 1 assert run.pop("input") == {"in": "foo"} assert run.pop("reference_output") == {"out": "bar"} assert run.pop("output") == "some LLM application output" assert run.pop("error") == "an error message, if applicable" assert isinstance(run.pop("latency_ms"), float) assert isinstance(run.pop("start_time"), str) assert isinstance(run.pop("end_time"), str) assert run.pop("trace_id") == "placeholder-id" assert run.pop("prompt_token_count") is None assert run.pop("completion_token_count") is None assert run.pop("annotations") == [] assert not run # get experiment CSV after runs but before evaluations response = await httpx_client.get(f"v1/experiments/{experiment_gid}/csv") assert response.status_code == 200 assert response.headers["content-type"] == "text/csv" assert response.headers["content-disposition"].startswith('attachment; filename="') # Parse CSV content and verify the data csv_content = response.text df = pd.read_csv(StringIO(csv_content)) assert len(df) == 1 # Convert first row to dictionary and verify all fields row = df.iloc[0].to_dict() assert isinstance(row.pop("example_id"), str) assert row.pop("repetition_number") == 1 assert json.loads(row.pop("input")) == {"in": "foo"} assert json.loads(row.pop("reference_output")) == {"out": "bar"} assert row.pop("output") == "some LLM application output" assert row.pop("error") == "an error message, if applicable" assert isinstance(row.pop("latency_ms"), float) assert isinstance(row.pop("start_time"), str) assert isinstance(row.pop("end_time"), str) assert row.pop("trace_id") == "placeholder-id" assert pd.isna(row.pop("prompt_token_count")) assert pd.isna(row.pop("completion_token_count")) assert not row # experiment runs can be listed for evaluations experiment_runs = (await httpx_client.get(f"v1/experiments/{experiment_gid}/runs")).json()[ "data" ] assert experiment_runs assert len(experiment_runs) == 1 # each experiment run can be evaluated evaluation_payload = { "experiment_run_id": run_payload["id"], "trace_id": "placeholder-id", "name": "some_evaluation_name", "annotator_kind": "LLM", "result": { "label": "some label", "score": 0.5, "explanation": "some explanation", "metadata": {"some": "metadata"}, }, "error": "an error message, if applicable", "start_time": datetime.now(timezone.utc).isoformat(), "end_time": datetime.now(timezone.utc).isoformat(), } experiment_evaluation = ( await httpx_client.post("v1/experiment_evaluations", json=evaluation_payload) ).json() assert experiment_evaluation # get experiment JSON after adding evaluations response = await httpx_client.get(f"v1/experiments/{experiment_gid}/json") assert response.status_code == 200 runs = json.loads(response.text) assert len(runs) == 1 assert len(runs[0]["annotations"]) == 1 annotation = runs[0]["annotations"][0] assert annotation.pop("name") == "some_evaluation_name" assert annotation.pop("label") == "some label" assert annotation.pop("score") == 0.5 assert annotation.pop("explanation") == "some explanation" assert annotation.pop("metadata") == {} assert annotation.pop("annotator_kind") == "LLM" assert annotation.pop("trace_id") == "placeholder-id" assert annotation.pop("error") == "an error message, if applicable" assert isinstance(annotation.pop("start_time"), str) assert isinstance(annotation.pop("end_time"), str) assert not annotation # get experiment CSV after evaluations response = await httpx_client.get(f"v1/experiments/{experiment_gid}/csv") assert response.status_code == 200 assert response.headers["content-type"] == "text/csv" assert response.headers["content-disposition"].startswith('attachment; filename="') # Parse CSV content and verify the data with annotations csv_content = response.text df = pd.read_csv(StringIO(csv_content)) assert len(df) == 1 # Verify base fields row = df.iloc[0].to_dict() assert isinstance(row.pop("example_id"), str) assert row.pop("repetition_number") == 1 assert json.loads(row.pop("input")) == {"in": "foo"} assert json.loads(row.pop("reference_output")) == {"out": "bar"} assert row.pop("output") == "some LLM application output" assert row.pop("error") == "an error message, if applicable" assert isinstance(row.pop("latency_ms"), float) assert isinstance(row.pop("start_time"), str) assert isinstance(row.pop("end_time"), str) assert row.pop("trace_id") == "placeholder-id" assert pd.isna(row.pop("prompt_token_count")) assert pd.isna(row.pop("completion_token_count")) # Verify annotation fields annotation_prefix = "annotation_some_evaluation_name" assert row.pop(f"{annotation_prefix}_label") == "some label" assert row.pop(f"{annotation_prefix}_score") == 0.5 assert row.pop(f"{annotation_prefix}_explanation") == "some explanation" assert json.loads(row.pop(f"{annotation_prefix}_metadata")) == {} assert row.pop(f"{annotation_prefix}_annotator_kind") == "LLM" assert row.pop(f"{annotation_prefix}_trace_id") == "placeholder-id" assert row.pop(f"{annotation_prefix}_error") == "an error message, if applicable" assert isinstance(row.pop(f"{annotation_prefix}_start_time"), str) assert isinstance(row.pop(f"{annotation_prefix}_end_time"), str) assert not row async def test_experiment_404s_with_missing_dataset( httpx_client: httpx.AsyncClient, simple_dataset: Any, ) -> None: incorrect_dataset_gid = GlobalID("Dataset", "1") response = await httpx_client.post( f"v1/datasets/{incorrect_dataset_gid}/experiments", json={"version_id": None} ) assert response.status_code == 404 async def test_experiment_404s_with_missing_version( httpx_client: httpx.AsyncClient, simple_dataset: Any, ) -> None: correct_dataset_gid = GlobalID("Dataset", "0") incorrect_version_gid = GlobalID("DatasetVersion", "9000") response = await httpx_client.post( f"v1/datasets/{correct_dataset_gid}/experiments", json={"version_id": str(incorrect_version_gid)}, ) assert response.status_code == 404 async def test_reading_experiments( httpx_client: httpx.AsyncClient, dataset_with_experiments_without_runs: Any, ) -> None: experiment_gid = GlobalID("Experiment", "0") dataset_gid = GlobalID("Dataset", "1") dataset_version_gid = GlobalID("DatasetVersion", "1") response = await httpx_client.get(f"v1/experiments/{experiment_gid}") assert response.status_code == 200 experiment = response.json()["data"] assert "created_at" in experiment assert "updated_at" in experiment expected = { "id": str(experiment_gid), "dataset_id": str(dataset_gid), "dataset_version_id": str(dataset_version_gid), "metadata": {"info": "a test experiment"}, } assert all(experiment[key] == value for key, value in expected.items()) async def test_listing_experiments_on_empty_dataset( httpx_client: httpx.AsyncClient, dataset_with_experiments_without_runs: Any, ) -> None: dataset_gid = GlobalID("Dataset", "0") response = await httpx_client.get(f"v1/datasets/{dataset_gid}/experiments") assert response.status_code == 200 experiments = response.json()["data"] [experiment["id"] for experiment in experiments] assert len(experiments) == 0, "Both experiments are associated with Dataset with ID 1" async def test_listing_experiments_by_dataset( httpx_client: httpx.AsyncClient, dataset_with_experiments_without_runs: Any, ) -> None: dataset_gid = GlobalID("Dataset", "1") experiment_gid_0 = GlobalID("Experiment", "0") experiment_gid_1 = GlobalID("Experiment", "1") response = await httpx_client.get(f"v1/datasets/{dataset_gid}/experiments") assert response.status_code == 200 experiments = response.json()["data"] experiment_gids = [experiment["id"] for experiment in experiments] assert len(experiments) == 2 assert str(experiment_gid_1) == experiment_gids[0], "experiments are listed newest first" assert str(experiment_gid_0) == experiment_gids[1], "experiments are listed newest first" async def test_deleting_dataset_also_deletes_experiments( httpx_client: httpx.AsyncClient, dataset_with_experiments_runs_and_evals: Any, ) -> None: ds_url = f"v1/datasets/{GlobalID('Dataset', str(1))}" exp_url = f"v1/experiments/{GlobalID('Experiment', str(1))}" runs_url = f"{exp_url}/runs" (await httpx_client.get(exp_url)).raise_for_status() assert len((await httpx_client.get(runs_url)).json()["data"]) > 0 (await httpx_client.delete(ds_url)).raise_for_status() assert len((await httpx_client.get(runs_url)).json()["data"]) == 0 with pytest.raises(HTTPStatusError): (await httpx_client.get(exp_url)).raise_for_status() async def test_experiment_runs_pagination( httpx_client: httpx.AsyncClient, simple_dataset: Any, ) -> None: """Test pagination functionality for experiment runs endpoint.""" dataset_gid = GlobalID("Dataset", "0") # Create experiment and runs experiment = ( await httpx_client.post( f"v1/datasets/{dataset_gid}/experiments", json={"version_id": None, "repetitions": 1}, ) ).json()["data"] dataset_examples = ( await httpx_client.get( f"v1/datasets/{dataset_gid}/examples", params={"version_id": str(experiment["dataset_version_id"])}, ) ).json()["data"]["examples"] # Create 5 runs for pagination testing created_runs = [] for i in range(5): run = ( await httpx_client.post( f"v1/experiments/{experiment['id']}/runs", json={ "dataset_example_id": str(dataset_examples[0]["id"]), "trace_id": f"trace-{i}", "output": f"output-{i}", "repetition_number": i + 1, "start_time": datetime.now(timezone.utc).isoformat(), "end_time": datetime.now(timezone.utc).isoformat(), }, ) ).json()["data"] created_runs.append(run["id"]) def get_numeric_ids(run_ids: list[str]) -> list[int]: """Helper to extract numeric IDs for comparison.""" return [int(GlobalID.from_id(run_id).node_id) for run_id in run_ids] # Expected order: descending by numeric ID expected_ids = sorted(get_numeric_ids(created_runs), reverse=True) # [5, 4, 3, 2, 1] async def get_runs(limit: Optional[int] = None, cursor: Optional[str] = None) -> dict[str, Any]: """Helper to fetch runs with optional pagination.""" params: dict[str, Any] = {} if limit is not None: params["limit"] = limit if cursor is not None: params["cursor"] = cursor response = await httpx_client.get(f"v1/experiments/{experiment['id']}/runs", params=params) assert response.status_code == 200 return response.json() # type: ignore[no-any-return] # Test: No pagination (backward compatibility) all_runs = await get_runs() assert len(all_runs["data"]) == 5 assert all_runs["next_cursor"] is None all_runs_ids = [run["id"] for run in all_runs["data"]] assert get_numeric_ids(all_runs_ids) == expected_ids # Test: Page-by-page pagination with exact content validation page1 = await get_runs(limit=2) assert len(page1["data"]) == 2 assert page1["next_cursor"] is not None page1_ids = get_numeric_ids([run["id"] for run in page1["data"]]) assert page1_ids == expected_ids[:2] # [5, 4] assert GlobalID.from_id(page1["next_cursor"]).node_id == str(expected_ids[2]) # "3" page2 = await get_runs(limit=2, cursor=page1["next_cursor"]) assert len(page2["data"]) == 2 assert page2["next_cursor"] is not None page2_ids = get_numeric_ids([run["id"] for run in page2["data"]]) assert page2_ids == expected_ids[2:4] # [3, 2] assert GlobalID.from_id(page2["next_cursor"]).node_id == str(expected_ids[4]) # "1" page3 = await get_runs(limit=2, cursor=page2["next_cursor"]) assert len(page3["data"]) == 1 assert page3["next_cursor"] is None page3_ids = get_numeric_ids([run["id"] for run in page3["data"]]) assert page3_ids == expected_ids[4:5] # [1] # Test: Aggregated pagination equals non-paginated paginated_ids = page1_ids + page2_ids + page3_ids assert paginated_ids == expected_ids paginated_run_ids = [run["id"] for run in page1["data"] + page2["data"] + page3["data"]] assert paginated_run_ids == all_runs_ids # Test: Large limit (no pagination) large_limit = await get_runs(limit=100) assert len(large_limit["data"]) == 5 assert large_limit["next_cursor"] is None assert get_numeric_ids([run["id"] for run in large_limit["data"]]) == expected_ids # Test: Invalid cursor response = await httpx_client.get( f"v1/experiments/{experiment['id']}/runs", params={"limit": 2, "cursor": "invalid-cursor"} ) assert response.status_code == 422 class TestExperimentCounts: """ Test suite for experiment count fields (example_count, successful_run_count, failed_run_count, and missing_run_count). Validates that counts are accurate across all experiment endpoints (create, get, list). """ @staticmethod async def _get_experiment( httpx_client: httpx.AsyncClient, experiment_gid: GlobalID ) -> dict[str, Any]: """Helper to fetch experiment data.""" response = await httpx_client.get(f"v1/experiments/{experiment_gid}") assert response.status_code == 200 return response.json()["data"] # type: ignore[no-any-return] @staticmethod async def _create_run( httpx_client: httpx.AsyncClient, experiment_gid: GlobalID, example_gid: GlobalID, repetition_number: int, trace_id: str, output: str, error: Optional[str] = None, ) -> None: """Helper to create an experiment run.""" await httpx_client.post( f"v1/experiments/{experiment_gid}/runs", json={ "dataset_example_id": str(example_gid), "trace_id": trace_id, "output": output, "repetition_number": repetition_number, "start_time": datetime.now(timezone.utc).isoformat(), "end_time": datetime.now(timezone.utc).isoformat(), **({"error": error} if error else {}), }, ) async def test_comprehensive_count_scenarios( self, httpx_client: httpx.AsyncClient, experiments_with_incomplete_runs: ExperimentsWithIncompleteRuns, ) -> None: """ Comprehensive test for example_count, successful_run_count, failed_run_count, and missing_run_count fields. Scenarios tested: 1. Mixed runs (v1) - some successful, some failed, some missing 2. No runs at all (v1) - zero successful and failed runs, all missing 3. Deleted examples (v2) - handles dataset versioning with deletions 4. Incremental additions (v2) - successful -> failed -> successful progression 5. List endpoint - multiple experiments with correct counts 6. Create endpoint - returns correct initial counts 7. All runs failed - edge case where runs exist but successful_run_count = 0 8. Simple boundary - minimal viable case (1 repetition, 1 successful run) missing_run_count is calculated as: (example_count × repetitions) - successful_run_count - failed_run_count """ dataset = experiments_with_incomplete_runs.dataset exp_v1_mixed = experiments_with_incomplete_runs.experiment_v1_mixed exp_v1_empty = experiments_with_incomplete_runs.experiment_v1_empty exp_v2_deletion = experiments_with_incomplete_runs.experiment_v2_with_deletion exp_v2_incremental = experiments_with_incomplete_runs.experiment_v2_incremental examples = experiments_with_incomplete_runs.examples_in_v1 # Convert to GlobalIDs dataset_gid = GlobalID("Dataset", str(dataset.id)) exp_v1_mixed_gid = GlobalID("Experiment", str(exp_v1_mixed.id)) exp_v1_empty_gid = GlobalID("Experiment", str(exp_v1_empty.id)) exp_v2_deletion_gid = GlobalID("Experiment", str(exp_v2_deletion.id)) exp_v2_incremental_gid = GlobalID("Experiment", str(exp_v2_incremental.id)) # ===== Test 1: Experiment with mixed successful and failed runs (v1) ===== # exp_v1_mixed: has 5 examples, 7 successful runs, 3 failed runs, 3 repetitions # Total expected: 5 × 3 = 15 runs # (ex0: 3 successful, ex1: 1 successful + 1 failed, # ex2: 0 runs, ex3: 2 successful + 1 failed, ex4: 1 successful + 1 failed) exp1_data = await self._get_experiment(httpx_client, exp_v1_mixed_gid) assert exp1_data["example_count"] == 5, "exp_v1_mixed should have 5 examples" assert exp1_data["successful_run_count"] == 7, ( "exp_v1_mixed should have 7 successful runs (3+1+0+2+1)" ) assert exp1_data["failed_run_count"] == 3, ( "exp_v1_mixed should have 3 failed runs (0+1+0+1+1)" ) assert exp1_data["missing_run_count"] == 5, ( "exp_v1_mixed should have 5 missing runs (15 total - 7 successful - 3 failed)" ) # ===== Test 2: Experiment with no runs at all (v1) ===== # exp_v1_empty: 5 examples, 2 repetitions = 10 total expected runs exp2_data = await self._get_experiment(httpx_client, exp_v1_empty_gid) assert exp2_data["example_count"] == 5, "exp_v1_empty should have 5 examples" assert exp2_data["successful_run_count"] == 0, "exp_v1_empty should have 0 successful runs" assert exp2_data["failed_run_count"] == 0, "exp_v1_empty should have 0 failed runs" assert exp2_data["missing_run_count"] == 10, ( "exp_v1_empty should have 10 missing runs (5 examples × 2 repetitions)" ) # ===== Test 3: Experiment with deleted example in v2 ===== # exp_v2_deletion: has 4 examples (ex2 deleted from v2), 4 successful runs, 1 failed, 2 repetitions # Total expected: 4 × 2 = 8 runs exp3_data = await self._get_experiment(httpx_client, exp_v2_deletion_gid) assert exp3_data["example_count"] == 4, ( "exp_v2_deletion should have 4 examples (ex2 deleted)" ) assert exp3_data["successful_run_count"] == 4, ( "exp_v2_deletion should have 4 successful runs (2+1+0+1)" ) assert exp3_data["failed_run_count"] == 1, ( "exp_v2_deletion should have 1 failed run (0+1+0+0)" ) assert exp3_data["missing_run_count"] == 3, ( "exp_v2_deletion should have 3 missing runs (8 total - 4 successful - 1 failed)" ) # ===== Test 4: Fresh experiment (v2), then incrementally add runs ===== # exp_v2_incremental: has 2 examples, 3 repetitions = 6 total expected runs exp4_data = await self._get_experiment(httpx_client, exp_v2_incremental_gid) assert exp4_data["example_count"] == 2, "exp_v2_incremental should have 2 examples" assert exp4_data["successful_run_count"] == 0, ( "exp_v2_incremental should start with 0 successful runs" ) assert exp4_data["failed_run_count"] == 0, ( "exp_v2_incremental should start with 0 failed runs" ) assert exp4_data["missing_run_count"] == 6, ( "exp_v2_incremental should start with 6 missing runs (2 × 3)" ) # Add a successful run for the first example example_gid_0 = GlobalID("DatasetExample", str(examples[0].id)) await self._create_run( httpx_client, exp_v2_incremental_gid, example_gid_0, 1, "test-trace-1", "success output" ) # Verify count increased after successful run exp4_data = await self._get_experiment(httpx_client, exp_v2_incremental_gid) assert exp4_data["example_count"] == 2 assert exp4_data["successful_run_count"] == 1, ( "Should have 1 successful run after adding one" ) assert exp4_data["failed_run_count"] == 0, "Should still have 0 failed runs" assert exp4_data["missing_run_count"] == 5, "Should have 5 missing runs (6 - 1)" # Add a failed run for the first example (different repetition) await self._create_run( httpx_client, exp_v2_incremental_gid, example_gid_0, 2, "test-trace-2", "error output", error="Test error occurred", ) # Verify failed run doesn't increment successful_run_count but decrements missing_run_count exp4_data = await self._get_experiment(httpx_client, exp_v2_incremental_gid) assert exp4_data["example_count"] == 2 assert exp4_data["successful_run_count"] == 1, ( "Failed run should not increment successful count" ) assert exp4_data["failed_run_count"] == 1, "Should have 1 failed run after adding one" assert exp4_data["missing_run_count"] == 4, "Should have 4 missing runs (6 - 1 - 1)" # Add another successful run await self._create_run( httpx_client, exp_v2_incremental_gid, example_gid_0, 3, "test-trace-3", "success output" ) # Verify count increased again exp4_data = await self._get_experiment(httpx_client, exp_v2_incremental_gid) assert exp4_data["example_count"] == 2 assert exp4_data["successful_run_count"] == 2, "Should have 2 successful runs now" assert exp4_data["failed_run_count"] == 1, "Should still have 1 failed run" assert exp4_data["missing_run_count"] == 3, "Should have 3 missing runs (6 - 2 - 1)" # ===== Test 5: List experiments endpoint returns all with correct counts ===== list_response = await httpx_client.get(f"v1/datasets/{dataset_gid}/experiments") assert list_response.status_code == 200 experiments_list = list_response.json()["data"] assert len(experiments_list) == 4, "Should have 4 experiments" # Find the experiments in the list (order might vary) exp1_in_list = next(e for e in experiments_list if e["id"] == str(exp_v1_mixed_gid)) exp2_in_list = next(e for e in experiments_list if e["id"] == str(exp_v1_empty_gid)) exp3_in_list = next(e for e in experiments_list if e["id"] == str(exp_v2_deletion_gid)) exp4_in_list = next(e for e in experiments_list if e["id"] == str(exp_v2_incremental_gid)) # Verify counts in list endpoint match individual GET requests assert exp1_in_list["example_count"] == 5 assert exp1_in_list["successful_run_count"] == 7 assert exp1_in_list["failed_run_count"] == 3 assert exp1_in_list["missing_run_count"] == 5 assert exp2_in_list["example_count"] == 5 assert exp2_in_list["successful_run_count"] == 0 assert exp2_in_list["failed_run_count"] == 0 assert exp2_in_list["missing_run_count"] == 10 assert exp3_in_list["example_count"] == 4 # ex2 deleted in v2 assert exp3_in_list["successful_run_count"] == 4 assert exp3_in_list["failed_run_count"] == 1 assert exp3_in_list["missing_run_count"] == 3 assert exp4_in_list["example_count"] == 2 assert exp4_in_list["successful_run_count"] == 2 assert exp4_in_list["failed_run_count"] == 1 assert exp4_in_list["missing_run_count"] == 3 # ===== Test 6: Create endpoint returns correct initial counts ===== # Create a fresh experiment and verify the create response has correct counts new_exp_response = await httpx_client.post( f"v1/datasets/{dataset_gid}/experiments", json={"version_id": None, "repetitions": 1}, ) assert new_exp_response.status_code == 200 new_exp_data = new_exp_response.json()["data"] # Verify counts in create response (not just GET) assert new_exp_data["example_count"] == 5, "Create response should have example_count" assert new_exp_data["successful_run_count"] == 0, ( "Create response should start with 0 successful runs" ) assert new_exp_data["failed_run_count"] == 0, ( "Create response should start with 0 failed runs" ) assert new_exp_data["missing_run_count"] == 5, ( "Create response should start with 5 missing runs (5 examples × 1 repetition)" ) # ===== Test 7: Edge case - All runs failed ===== new_exp_gid = new_exp_data["id"] # Add only failed runs for all examples for i, example in enumerate(examples): example_gid = GlobalID("DatasetExample", str(example.id)) await self._create_run( httpx_client, new_exp_gid, example_gid, 1, f"all-failed-trace-{i}", "failed output", error=f"All runs failed - example {i}", ) # Verify that with all runs failed, successful_run_count is still 0 but failed_run_count is 5 all_failed_data = await self._get_experiment(httpx_client, new_exp_gid) assert all_failed_data["example_count"] == 5 assert all_failed_data["successful_run_count"] == 0, ( "All failed runs should result in 0 successful count" ) assert all_failed_data["failed_run_count"] == 5, ( "All failed runs should result in 5 failed count" ) assert all_failed_data["missing_run_count"] == 0, ( "All failed runs should result in 0 missing count" ) # ===== Test 8: Simple boundary case - 1 example, 1 repetition ===== # This is the simplest possible experiment simple_exp_response = await httpx_client.post( f"v1/datasets/{dataset_gid}/experiments", json={"version_id": None, "repetitions": 1}, ) simple_exp_data = simple_exp_response.json()["data"] simple_exp_gid = simple_exp_data["id"] # Verify simple case starts correctly assert simple_exp_data["example_count"] == 5 assert simple_exp_data["successful_run_count"] == 0 assert simple_exp_data["failed_run_count"] == 0 assert simple_exp_data["missing_run_count"] == 5 # Add exactly 1 successful run await self._create_run( httpx_client, simple_exp_gid, GlobalID("DatasetExample", str(examples[0].id)), 1, "simple-success", "simple output", ) # Verify count is exactly 1 simple_data = await self._get_experiment(httpx_client, simple_exp_gid) assert simple_data["example_count"] == 5 assert simple_data["successful_run_count"] == 1, ( "Simple 1-run case should have exactly 1 successful" ) assert simple_data["failed_run_count"] == 0, "Simple 1-run case should have 0 failed runs" assert simple_data["missing_run_count"] == 4, ( "Simple 1-run case should have 4 missing runs (5 - 1)" ) class TestIncompleteRuns: """ Test suite for the incomplete runs endpoint. Validates detection of missing and failed experiment runs with proper pagination and error handling. """ @staticmethod async def _get_incomplete_runs( httpx_client: httpx.AsyncClient, experiment_gid: GlobalID, limit: Optional[int] = None, cursor: Optional[str] = None, ) -> dict[str, Any]: """Helper to fetch incomplete runs.""" params: dict[str, Any] = {} if limit is not None: params["limit"] = limit if cursor is not None: params["cursor"] = cursor response = await httpx_client.get( f"v1/experiments/{experiment_gid}/incomplete-runs", params=params ) result: dict[str, Any] = {"status_code": response.status_code} if response.status_code == 200: result.update(response.json()) else: result["text"] = response.text if response.headers.get("content-type", "").startswith("application/json"): result["json"] = response.json() return result @staticmethod async def _create_run( httpx_client: httpx.AsyncClient, experiment_gid: GlobalID, example_gid: GlobalID, repetition_number: int, trace_id: str, output: str, error: Optional[str] = None, ) -> None: """Helper to create an experiment run.""" await httpx_client.post( f"v1/experiments/{experiment_gid}/runs", json={ "dataset_example_id": str(example_gid), "trace_id": trace_id, "output": output, "repetition_number": repetition_number, "start_time": datetime.now(timezone.utc).isoformat(), "end_time": datetime.now(timezone.utc).isoformat(), **({"error": error} if error else {}), }, ) async def test_incomplete_runs( self, httpx_client: httpx.AsyncClient, experiments_with_incomplete_runs: ExperimentsWithIncompleteRuns, ) -> None: """ Comprehensive test for the /incomplete-runs endpoint. Scenarios tested: 1. Basic functionality - missing and failed runs detection 2. Dataset example data - verify all required fields are included 3. Complete examples exclusion - examples with all runs complete are excluded 4. Pagination - multiple pages with limit=2 5. No duplicates - verify pagination doesn't return duplicate examples 6. Invalid experiment ID - returns 404 error 7. Invalid cursor - returns 422 error 8. Repetitions=1 optimization - test incomplete runs with repetitions=1 9. All runs complete - edge case where no incomplete runs exist (empty result) """ experiment_gid = GlobalID( "Experiment", str(experiments_with_incomplete_runs.experiment_v1_mixed.id) ) dataset_id = experiments_with_incomplete_runs.dataset.id example_id_map = experiments_with_incomplete_runs.example_id_map # ===== Test 1: Basic functionality - includes missing and failed runs ===== result = await self._get_incomplete_runs(httpx_client, experiment_gid) assert result["status_code"] == 200 # Expected: # example 1: [2, 3] (failed, missing) # example 2: [1, 2, 3] (all missing) # example 3: [3] (failed) # example 4: [1, 2] (failed, missing) # Total: 4 examples with incomplete runs assert len(result["data"]) == 4, "Should have 4 incomplete examples" # Build a mapping of example_id to repetition_numbers incomplete_by_example = { int(GlobalID.from_id(run["dataset_example"]["id"]).node_id): run["repetition_numbers"] for run in result["data"] } assert incomplete_by_example[example_id_map[1]] == [2, 3] assert incomplete_by_example[example_id_map[2]] == [1, 2, 3] assert incomplete_by_example[example_id_map[3]] == [3] assert incomplete_by_example[example_id_map[4]] == [1, 2] # ===== Test 2: Verify dataset example data is included ===== for incomplete_run in result["data"]: assert "dataset_example" in incomplete_run assert "id" in incomplete_run["dataset_example"] assert "input" in incomplete_run["dataset_example"] assert "output" in incomplete_run["dataset_example"] # ===== Test 2.1: Verify correct revision snapshot (not latest revision) ===== # The fixture has ex3 modified in v2 (ex3-v2-patched), but experiment_v1_mixed # was created with v1, so it should return v1 data (ex3-v1), not v2 data. ex3_incomplete = next( ( run for run in result["data"] if int(GlobalID.from_id(run["dataset_example"]["id"]).node_id) == example_id_map[3] ), None, ) assert ex3_incomplete is not None, "Example 3 should be in incomplete runs" # Verify snapshot data is v1 (not v2) assert ex3_incomplete["dataset_example"]["input"] == {"query": "ex3-v1"}, ( f"Expected v1 snapshot data 'ex3-v1', but got " f"{ex3_incomplete['dataset_example']['input']!r}. " "This suggests the query is returning the latest revision instead of the snapshot." ) assert ex3_incomplete["dataset_example"]["output"] == {"response": "expected-3-v1"}, ( f"Expected v1 snapshot output 'expected-3-v1', but got " f"{ex3_incomplete['dataset_example']['output']!r}" ) # ===== Test 3: Complete examples are excluded ===== example_ids = [ int(GlobalID.from_id(run["dataset_example"]["id"]).node_id) for run in result["data"] ] assert example_id_map[0] not in example_ids, "Complete examples should be excluded" # ===== Test 4: Pagination with limit=2 ===== page1_data = await self._get_incomplete_runs(httpx_client, experiment_gid, limit=2) assert page1_data["status_code"] == 200 assert len(page1_data["data"]) == 2, "Page 1 should have 2 examples" assert page1_data["next_cursor"] is not None, "Should have next page" # Get page 2 page2_data = await self._get_incomplete_runs( httpx_client, experiment_gid, limit=2, cursor=page1_data["next_cursor"] ) assert page2_data["status_code"] == 200 assert len(page2_data["data"]) == 2, "Page 2 should have 2 examples" assert page2_data["next_cursor"] is None, "Should be last page" # ===== Test 5: Verify no duplicates across pages ===== page1_ids = [run["dataset_example"]["id"] for run in page1_data["data"]] page2_ids = [run["dataset_example"]["id"] for run in page2_data["data"]] all_example_ids = page1_ids + page2_ids assert len(set(all_example_ids)) == 4, "Should have 4 unique examples" assert len(all_example_ids) == len(set(all_example_ids)), "Should have no duplicates" # ===== Test 6: Invalid experiment ID returns 404 ===== invalid_experiment_gid = GlobalID("Experiment", "99999") invalid_result = await self._get_incomplete_runs(httpx_client, invalid_experiment_gid) assert invalid_result["status_code"] == 404, "Invalid experiment should return 404" # ===== Test 7: Invalid cursor returns 422 ===== # Reuse the existing dataset to create a new experiment for cursor validation dataset_gid = GlobalID("Dataset", str(dataset_id)) new_experiment = ( await httpx_client.post( f"v1/datasets/{dataset_gid}/experiments", json={"version_id": None, "repetitions": 1}, ) ).json()["data"] invalid_cursor_result = await self._get_incomplete_runs( httpx_client, new_experiment["id"], cursor="invalid-cursor" ) assert invalid_cursor_result["status_code"] == 422, "Invalid cursor should return 422" # ===== Test 8: Experiment with repetitions=1 (optimization path) ===== # Create experiment with repetitions=1 to test the optimization case where # there can be no "partially complete" examples rep1_experiment = ( await httpx_client.post( f"v1/datasets/{dataset_gid}/experiments", json={"version_id": None, "repetitions": 1}, ) ).json()["data"] # Get the examples for this experiment to understand what we're working with examples_response = await httpx_client.get( f"v1/datasets/{dataset_gid}/examples", params={"version_id": str(rep1_experiment["dataset_version_id"])}, ) examples = examples_response.json()["data"]["examples"] # Pick 3 examples to test with assert len(examples) >= 3, f"Need at least 3 examples, got {len(examples)}" # Pick the first 3 examples we can find test_examples = examples[:3] complete_example_id = test_examples[0]["id"] missing_example_id = test_examples[1]["id"] failed_example_id = test_examples[2]["id"] # Setup: example[0]=complete, example[1]=missing (no run), example[2]=failed await self._create_run( httpx_client, rep1_experiment["id"], complete_example_id, 1, f"trace-complete-{complete_example_id}", "success", error=None, ) # example[1] has no runs (missing) - don't create any run await self._create_run( httpx_client, rep1_experiment["id"], failed_example_id, 1, f"trace-failed-{failed_example_id}", "", error="Task failed", ) # Fetch incomplete runs to verify repetitions=1 optimization result = await self._get_incomplete_runs(httpx_client, rep1_experiment["id"]) assert result["status_code"] == 200 incomplete = { run["dataset_example"]["id"]: run["repetition_numbers"] for run in result["data"] } # Assertions for repetitions=1 behavior: # 1. Complete example should NOT be in incomplete results assert complete_example_id not in incomplete, ( "Complete example should not be in incomplete runs" ) # 2. Failed example SHOULD be in incomplete results with [1] assert failed_example_id in incomplete, "Failed example should be in incomplete runs" assert incomplete[failed_example_id] == [1], "Failed example should need repetition [1]" # 3. Missing example SHOULD be in incomplete results with [1] assert missing_example_id in incomplete, "Missing example should be in incomplete runs" assert incomplete[missing_example_id] == [1], "Missing example should need repetition [1]" # ===== Test 9: All runs complete - edge case (empty result) ===== # Now complete ALL runs in the repetitions=1 experiment for example in examples: # Create or update to successful if example["id"] != complete_example_id: # Skip already complete example await self._create_run( httpx_client, rep1_experiment["id"], example["id"], 1, f"complete-trace-{example['id']}", "success", ) # Verify that no incomplete runs are returned after all are complete complete_data = await self._get_incomplete_runs(httpx_client, rep1_experiment["id"]) assert complete_data["status_code"] == 200 assert len(complete_data["data"]) == 0, ( "Experiment with all runs complete should have no incomplete runs" ) assert complete_data["next_cursor"] is None, "Should have no next cursor" class TestIncompleteEvaluations: """ Comprehensive test suite for the incomplete evaluations endpoint. Tests detection of missing and failed evaluations with: - Correct filtering and categorization - Proper pagination behavior - Edge cases and boundary conditions - Error handling - Performance optimizations (JSON aggregation, error string optimization) """ @staticmethod async def _get_incomplete_evaluations( httpx_client: httpx.AsyncClient, experiment_gid: GlobalID, evaluator_names: Optional[list[str]] = None, limit: Optional[int] = None, cursor: Optional[str] = None, ) -> dict[str, Any]: """Helper to fetch incomplete evaluations.""" params: dict[str, Any] = {} if evaluator_names is not None: params["evaluation_name"] = evaluator_names if limit is not None: params["limit"] = limit if cursor is not None: params["cursor"] = cursor response = await httpx_client.get( f"v1/experiments/{experiment_gid}/incomplete-evaluations", params=params ) result: dict[str, Any] = {"status_code": response.status_code} if response.status_code == 200: result.update(response.json()) else: result["text"] = response.text if response.headers.get("content-type", "").startswith("application/json"): result["json"] = response.json() return result async def test_incomplete_evaluations( self, httpx_client: httpx.AsyncClient, experiments_with_incomplete_runs: ExperimentsWithIncompleteRuns, db: DbSessionFactory, ) -> None: """ Comprehensive test for /incomplete-evaluations endpoint. This test validates the complete lifecycle and edge cases of evaluations, organized into logical sections: I. Core Functionality - Missing and failed evaluation detection - Response structure and data completeness - Filtering (complete runs excluded, task-level errors excluded) II. Pagination & Ordering - Correct ordering (by run ID ascending) - Pagination with various limits (1, 2, large, oversized) - Cursor behavior (valid, invalid, at boundaries) - No gaps or duplicates across pages - No empty pages with next_cursor (critical bug fix) III. Edge Cases - Multiple evaluation names at once - Duplicate evaluation names - Single vs multiple evaluations - All evaluations complete (empty result) IV. Error Handling - No evaluation names (400 error) - Invalid experiment ID (404 error) - Invalid cursor (422 error) - Experiment with no runs (empty result) V. Security - SQL injection attempts through evaluation_name parameter - Mixed malicious and legitimate names - Database integrity after attacks """ # Setup: Get experiment and example data exp_v1_mixed = experiments_with_incomplete_runs.experiment_v1_mixed exp_v1_empty = experiments_with_incomplete_runs.experiment_v1_empty exp_gid = GlobalID("Experiment", str(exp_v1_mixed.id)) exp_empty_gid = GlobalID("Experiment", str(exp_v1_empty.id)) now = datetime.now(timezone.utc) # Randomized evaluation names to avoid test pollution eval1 = f"eval1_{token_hex(4)}" eval2 = f"eval2_{token_hex(4)}" ordering_test_eval = f"ordering_test_{token_hex(4)}" never_added_eval = f"never_added_{token_hex(4)}" single_eval_test = f"single_eval_{token_hex(4)}" all_complete_eval = f"all_complete_{token_hex(4)}" # ==================================================================================== # SETUP: Create test data with diverse evaluation states # ==================================================================================== async with db() as session: # Get all runs for the experiment runs_result = await session.execute( select(models.ExperimentRun) .where(models.ExperimentRun.experiment_id == exp_v1_mixed.id) .order_by(models.ExperimentRun.id) ) all_runs = list(runs_result.scalars()) # Create specific scenarios for comprehensive testing # Only annotate successful runs (filter out failed runs) successful_runs = [run for run in all_runs if run.error is None] assert len(successful_runs) >= 5, ( f"Fixture must provide at least 5 successful runs, got {len(successful_runs)}" ) # Run 0: Complete for eval1, missing eval2 (partially complete) session.add( models.ExperimentRunAnnotation( experiment_run_id=successful_runs[0].id, name=eval1, annotator_kind="CODE", label="success", score=1.0, error=None, metadata_={}, start_time=now, end_time=now, ) ) # Run 1: Complete for BOTH eval1 and eval2 (fully complete - should be filtered out!) for eval_name in [eval1, eval2]: session.add( models.ExperimentRunAnnotation( experiment_run_id=successful_runs[1].id, name=eval_name, annotator_kind="CODE", label="success", score=1.0, error=None, metadata_={}, start_time=now, end_time=now, ) ) # Run 2: Failed eval1, complete eval2 (partial - failed counts as incomplete) session.add( models.ExperimentRunAnnotation( experiment_run_id=successful_runs[2].id, name=eval1, annotator_kind="CODE", label=None, score=None, error="Evaluation failed", metadata_={}, start_time=now, end_time=now, ) ) session.add( models.ExperimentRunAnnotation( experiment_run_id=successful_runs[2].id, name=eval2, annotator_kind="CODE", label="success", score=1.0, error=None, metadata_={}, start_time=now, end_time=now, ) ) # Run 3: Missing both eval1 and eval2 (no annotations) # (No annotations added) # Run 4: Failed both eval1 and eval2 for eval_name in [eval1, eval2]: session.add( models.ExperimentRunAnnotation( experiment_run_id=successful_runs[4].id, name=eval_name, annotator_kind="CODE", label=None, score=None, error="Evaluation error", metadata_={}, start_time=now, end_time=now, ) ) # For all remaining successful runs beyond the first 5, add complete annotations # This prevents the test from being polluted by extra fixture runs for i in range(5, len(successful_runs)): for eval_name in [eval1, eval2]: session.add( models.ExperimentRunAnnotation( experiment_run_id=successful_runs[i].id, name=eval_name, annotator_kind="CODE", label="success", score=1.0, error=None, metadata_={}, start_time=now, end_time=now, ) ) # Setup for ordering test: annotate first run with ordering_test_eval assert len(all_runs) > 0, "Need at least one run for ordering test setup" session.add( models.ExperimentRunAnnotation( experiment_run_id=all_runs[0].id, name=ordering_test_eval, annotator_kind="CODE", label="success", score=1.0, error=None, metadata_={}, trace_id=None, start_time=now, end_time=now, ) ) # ==================================================================================== # PART I: CORE FUNCTIONALITY # ==================================================================================== # Test 1: Basic detection of missing and failed evaluations result = await self._get_incomplete_evaluations(httpx_client, exp_gid, [eval1, eval2]) assert result["status_code"] == 200 assert "data" in result # Test 2: Response structure validation if result["data"]: first_item = result["data"][0] assert "experiment_run" in first_item assert "dataset_example" in first_item assert "evaluation_names" in first_item assert isinstance(first_item["evaluation_names"], list) assert "id" in first_item["experiment_run"] assert "output" in first_item["experiment_run"] assert "dataset_example_id" in first_item["experiment_run"] # Test 2.1: Verify output consistency across endpoints run_id = first_item["experiment_run"]["id"] list_runs_response = await httpx_client.get(f"v1/experiments/{exp_gid}/runs") list_runs_data = list_runs_response.json()["data"] matching_run = next((r for r in list_runs_data if r["id"] == run_id), None) assert matching_run is not None, ( f"Run {run_id} from incomplete-evaluations not found in list_experiment_runs" ) # Both endpoints must return identical output for the same run assert first_item["experiment_run"]["output"] == matching_run["output"], ( f"Output must be identical across endpoints: " f"incomplete-evaluations returned {first_item['experiment_run']['output']!r}, " f"list_experiment_runs returned {matching_run['output']!r}" ) # Test 2.2: Verify correct revision snapshot in dataset_example # The fixture has ex3 modified in v2, but experiment_v1_mixed uses v1 snapshot example_id_map = experiments_with_incomplete_runs.example_id_map # Find an evaluation for ex3 (which was modified in v2) ex3_eval = next( ( item for item in result["data"] if int(GlobalID.from_id(item["experiment_run"]["dataset_example_id"]).node_id) == example_id_map[3] ), None, ) if ex3_eval is not None: # Verify snapshot data is v1 (not v2 which has "ex3-v2-patched") assert ex3_eval["dataset_example"]["input"] == {"query": "ex3-v1"}, ( f"Expected v1 snapshot data 'ex3-v1', but got " f"{ex3_eval['dataset_example']['input']!r}. " "This suggests the query is returning the latest revision instead of the snapshot." ) assert ex3_eval["dataset_example"]["output"] == {"response": "expected-3-v1"}, ( f"Expected v1 snapshot output 'expected-3-v1', but got " f"{ex3_eval['dataset_example']['output']!r}" ) # Test 2.5: Verify exactly one row per run (no duplicates from joins) run_ids_in_result = [item["experiment_run"]["id"] for item in result["data"]] assert len(run_ids_in_result) == len(set(run_ids_in_result)), ( "Each run should appear exactly once (one row per run, not multiple rows from joins)" ) # Get successful_runs again for assertions (we set them up in the db() context) async with db() as session: runs_result = await session.execute( select(models.ExperimentRun) .where(models.ExperimentRun.experiment_id == exp_v1_mixed.id) .where(models.ExperimentRun.error.is_(None)) .order_by(models.ExperimentRun.id) ) successful_runs = list(runs_result.scalars()) # Test 3: Filtering - complete runs excluded run1_gid = str(GlobalID("ExperimentRun", str(successful_runs[1].id))) assert run1_gid not in run_ids_in_result, ( "Run with all evaluations complete should be excluded" ) # Test 4: Correct categorization (missing vs failed both included) # Build expected results map expected_incomplete = { str(GlobalID("ExperimentRun", str(successful_runs[0].id))): [ eval2 ], # Complete eval1, missing eval2 str(GlobalID("ExperimentRun", str(successful_runs[2].id))): [ eval1 ], # Failed eval1, complete eval2 str(GlobalID("ExperimentRun", str(successful_runs[3].id))): { eval1, eval2, }, # Missing both str(GlobalID("ExperimentRun", str(successful_runs[4].id))): { eval1, eval2, }, # Failed both } # Verify we got exactly the expected runs actual_run_ids = {item["experiment_run"]["id"] for item in result["data"]} expected_run_ids = set(expected_incomplete.keys()) assert actual_run_ids == expected_run_ids, ( f"Expected incomplete runs {expected_run_ids}, got {actual_run_ids}. " f"Missing: {expected_run_ids - actual_run_ids}, Extra: {actual_run_ids - expected_run_ids}" ) # Verify each run has correct incomplete evaluations for item in result["data"]: run_id_str = item["experiment_run"]["id"] eval_names = item["evaluation_names"] expected = expected_incomplete[run_id_str] if isinstance(expected, list): assert eval_names == expected, ( f"Run {run_id_str} should have incomplete evals {expected}, got {eval_names}" ) else: # set assert set(eval_names) == expected, ( f"Run {run_id_str} should have incomplete evals {expected}, got {set(eval_names)}" ) # Test 5: All runs missing an evaluator all_missing_result = await self._get_incomplete_evaluations( httpx_client, exp_gid, [never_added_eval] ) assert all_missing_result["status_code"] == 200 # All successful runs should be missing this evaluation (since we never added it) successful_run_count = len(successful_runs) assert len(all_missing_result["data"]) == successful_run_count, ( f"All {successful_run_count} successful runs should be missing {never_added_eval}, " f"got {len(all_missing_result['data'])}" ) # ==================================================================================== # PART II: PAGINATION & ORDERING # ==================================================================================== # Test 6: Results ordered by run ID ascending order_result = await self._get_incomplete_evaluations( httpx_client, exp_gid, [ordering_test_eval] ) assert order_result["status_code"] == 200 order_run_ids = [item["experiment_run"]["id"] for item in order_result["data"]] # Require at least 2 results to test ordering assert len(order_run_ids) >= 2, ( f"Need at least 2 results to test ordering, got {len(order_run_ids)}. " f"Ensure fixture provides multiple runs missing {ordering_test_eval}" ) # Verify strict ascending order order_rowids = [ from_global_id_with_expected_type(GlobalID.from_id(gid), "ExperimentRun") for gid in order_run_ids ] for i in range(len(order_rowids) - 1): assert order_rowids[i] < order_rowids[i + 1], ( f"Results must be in ascending order: row {order_rowids[i]} should be < {order_rowids[i + 1]}" ) # Test 7: Pagination with limit=2 # First verify the total count we expect total_incomplete = len(result["data"]) assert total_incomplete == 4, ( f"Expected exactly 4 incomplete runs (0,2,3,4), got {total_incomplete}" ) paginated_result = await self._get_incomplete_evaluations( httpx_client, exp_gid, [eval1, eval2], limit=2 ) assert paginated_result["status_code"] == 200 assert len(paginated_result["data"]) == 2, ( "Should return exactly 2 runs when limit=2 and results exist" ) assert paginated_result["next_cursor"] is not None, ( "Must have next_cursor when limit < total results" ) # Test 8: limit=1 (minimum pagination) limit1_result = await self._get_incomplete_evaluations( httpx_client, exp_gid, [eval1, eval2], limit=1 ) assert limit1_result["status_code"] == 200 assert len(limit1_result["data"]) == 1, ( "Should return exactly 1 run when limit=1 and results exist" ) assert limit1_result["next_cursor"] is not None, ( "Must have next_cursor when limit=1 < total results" ) # Test 9: Large limit exceeding total runs large_limit_result = await self._get_incomplete_evaluations( httpx_client, exp_gid, [eval1, eval2], limit=10000 ) assert large_limit_result["status_code"] == 200 assert large_limit_result.get("next_cursor") is None, ( "Should not have next_cursor when limit exceeds total" ) # Test 10: Pagination continuity - no gaps or duplicates all_paginated_runs = [] cursor = None page_count = 0 max_pages = 10 # Safety limit for _ in range(max_pages): page_result = await self._get_incomplete_evaluations( httpx_client, exp_gid, [eval1, eval2], limit=2, cursor=cursor ) assert page_result["status_code"] == 200 page_count += 1 page_runs = page_result["data"] assert len(page_runs) > 0, ( f"Page {page_count} must have results (empty pages violate pagination contract)" ) all_paginated_runs.extend(page_runs) cursor = page_result.get("next_cursor") if cursor is None: break else: raise AssertionError( f"Pagination didn't complete within {max_pages} pages - possible infinite loop" ) # Verify correct total pages (4 results / 2 per page = 2 pages) assert page_count == 2, f"Expected 2 pages with limit=2 and 4 results, got {page_count}" # Verify no duplicates paginated_run_ids = [item["experiment_run"]["id"] for item in all_paginated_runs] assert len(paginated_run_ids) == len(set(paginated_run_ids)), ( f"Pagination must not have duplicates. Got {len(paginated_run_ids)} total, " f"{len(set(paginated_run_ids))} unique" ) # Verify all expected runs retrieved (no gaps) assert set(paginated_run_ids) == set(run_ids_in_result), ( f"Pagination must retrieve all results. " f"Missing: {set(run_ids_in_result) - set(paginated_run_ids)}, " f"Extra: {set(paginated_run_ids) - set(run_ids_in_result)}" ) # Test 11: No empty pages with next_cursor (critical bug fix) # If we got a next_cursor in any response, the data should NOT be empty for page_num in range(10): check_result = await self._get_incomplete_evaluations( httpx_client, exp_gid, [eval1, eval2], limit=1, cursor=cursor if page_num > 0 else None, ) if check_result.get("next_cursor"): assert len(check_result["data"]) > 0, ( f"BUG: Got next_cursor with empty data on page {page_num}! " "This means SQL filtering is not working and empty pages are returned." ) if not check_result.get("next_cursor"): break # ==================================================================================== # PART III: EDGE CASES # ==================================================================================== # Test 12: Single evaluation name single_eval_result = await self._get_incomplete_evaluations( httpx_client, exp_gid, [single_eval_test] ) assert single_eval_result["status_code"] == 200 # All successful runs should be missing this evaluation (since we never added it) assert len(single_eval_result["data"]) == successful_run_count, ( f"All {successful_run_count} successful runs should be missing {single_eval_test}, " f"got {len(single_eval_result['data'])}" ) for item in single_eval_result["data"]: assert item["evaluation_names"] == [single_eval_test], ( f"Single evaluation request must only return that evaluation in list, got {item['evaluation_names']}" ) # Test 13: Duplicate evaluation names (handled gracefully) duplicate_result = await self._get_incomplete_evaluations( httpx_client, exp_gid, [eval1, eval1, eval2, eval2] ) assert duplicate_result["status_code"] == 200 assert "data" in duplicate_result assert isinstance(duplicate_result["data"], list) # Verify duplicates are handled correctly (no crashes, valid structure) for item in duplicate_result["data"]: assert "experiment_run" in item assert "evaluation_names" in item # Each incomplete evaluation name should appear at most once per run assert len(item["evaluation_names"]) == len(set(item["evaluation_names"])), ( f"Evaluation names should be deduplicated within each run: {item['evaluation_names']}" ) # All evaluation names should be either eval1 or eval2 for name in item["evaluation_names"]: assert name in [eval1, eval2], f"Unexpected evaluation name: {name}" # Test 14: All evaluations complete (empty result) async with db() as session: runs_result = await session.execute( select(models.ExperimentRun) .where(models.ExperimentRun.experiment_id == exp_v1_mixed.id) .where(models.ExperimentRun.error.is_(None)) .order_by(models.ExperimentRun.id) ) successful_runs = list(runs_result.scalars()) for run in successful_runs: session.add( models.ExperimentRunAnnotation( experiment_run_id=run.id, name=all_complete_eval, annotator_kind="CODE", label="success", score=1.0, error=None, metadata_={}, start_time=now, end_time=now, ) ) all_complete_result = await self._get_incomplete_evaluations( httpx_client, exp_gid, [all_complete_eval] ) assert all_complete_result["status_code"] == 200 # Should return empty list (all successful runs have this evaluation complete) # Failed runs are excluded from results (they shouldn't be evaluated) assert len(all_complete_result["data"]) == 0, ( f"When all successful runs have completed an evaluation, should return 0 results, " f"got {len(all_complete_result['data'])}" ) assert all_complete_result.get("next_cursor") is None, ( "Empty result should not have next_cursor" ) # ==================================================================================== # PART IV: ERROR HANDLING # ==================================================================================== # Test 15: No evaluator names specified returns 400 no_evaluator_result = await self._get_incomplete_evaluations(httpx_client, exp_gid) assert no_evaluator_result["status_code"] == 400 assert "evaluation_name" in no_evaluator_result["text"].lower() # Test 16: Invalid experiment ID returns 404 fake_exp_gid = GlobalID("Experiment", "999999") invalid_result = await self._get_incomplete_evaluations(httpx_client, fake_exp_gid, [eval1]) assert invalid_result["status_code"] == 404 if "json" in invalid_result: assert "does not exist" in invalid_result["json"]["detail"] else: assert "does not exist" in invalid_result["text"] # Test 17: Invalid cursor returns 422 invalid_cursor_result = await self._get_incomplete_evaluations( httpx_client, exp_gid, [eval1], cursor="invalid-cursor" ) assert invalid_cursor_result["status_code"] == 422 # Test 18: Experiment with no runs returns empty result non_existent_eval = f"non_existent_{token_hex(4)}" empty_result = await self._get_incomplete_evaluations( httpx_client, exp_empty_gid, [non_existent_eval] ) assert empty_result["status_code"] == 200 assert len(empty_result["data"]) == 0, "Experiment with no runs should return empty" assert empty_result["next_cursor"] is None # ==================================================================================== # PART V: SECURITY # ==================================================================================== # Test 19: SQL injection attempts through evaluation_name parameter sql_injection_attempts = [ # Classic SQL injection attempts "'; DROP TABLE experiment_runs; --", "' OR '1'='1", "' OR 1=1--", "admin'--", "' UNION SELECT NULL--", # More sophisticated attempts "1' AND '1'='1", "1' UNION SELECT * FROM experiments--", "'; DELETE FROM experiments WHERE 1=1--", # Boolean-based blind SQL injection "' AND (SELECT COUNT(*) FROM experiments) > 0--", # Time-based blind SQL injection "'; WAITFOR DELAY '00:00:05'--", # PostgreSQL-specific attempts "'; SELECT pg_sleep(5)--", "' OR 1=1; --", # Multiple statement attempts "eval1'; DROP TABLE experiments; SELECT '", # NULL byte injection "eval1\x00", # Unicode/encoding attempts "eval1\u0027 OR 1=1--", ] for injection_attempt in sql_injection_attempts: # Test single malicious evaluation name result = await self._get_incomplete_evaluations( httpx_client, exp_gid, [injection_attempt] ) # Null bytes should be rejected with 400 (invalid input) if "\x00" in injection_attempt: assert result["status_code"] == 400, ( f"Null byte injection should return 400 error: {injection_attempt}" ) assert "null byte" in result["text"].lower(), ( "Error message should mention null bytes" ) else: # Other injection attempts should return valid response (not crash) assert result["status_code"] == 200, ( f"SQL injection attempt should not cause server error: {injection_attempt}" ) # Result should be empty or contain valid data structure assert "data" in result, "Response should have data field" assert isinstance(result["data"], list), "Data should be a list" # If there's data, verify structure is intact for item in result["data"]: assert "experiment_run" in item assert "dataset_example" in item assert "evaluation_names" in item assert isinstance(item["evaluation_names"], list) # Test 20: Mixed malicious and legitimate names mixed_attempt = await self._get_incomplete_evaluations( httpx_client, exp_gid, ["legitimate_eval", "'; DROP TABLE experiments--", "another_eval"], ) assert mixed_attempt["status_code"] == 200 assert "data" in mixed_attempt # Test 21: Verify database integrity after SQL injection attempts normal_result = await self._get_incomplete_evaluations( httpx_client, exp_gid, ["safe_evaluation_name"] ) assert normal_result["status_code"] == 200, ( "Database should still be functional after SQL injection attempts" )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'

If you have feedback or need assistance with the MCP directory API, please join our Discord server