Skip to main content
Glama

@arizeai/phoenix-mcp

Official
by Arize-ai
test_inferences.py55.6 kB
""" Test inferences """ import logging import math import uuid from collections.abc import Iterator from contextlib import contextmanager from dataclasses import replace from datetime import datetime import numpy as np import pandas as pd import pytest import pytz from pandas import DataFrame, Series, Timestamp from pytest import LogCaptureFixture, raises import phoenix.inferences.errors as err from phoenix.inferences.errors import DatasetError from phoenix.inferences.inferences import ( Inferences, _normalize_timestamps, _parse_dataframe_and_schema, ) from phoenix.inferences.schema import ( EmbeddingColumnNames, RetrievalEmbeddingColumnNames, Schema, ) class TestParseDataFrameAndSchema: """ Tests for `_parse_dataframe_and_schema` """ _NUM_RECORDS = 5 _EMBEDDING_DIMENSION = 7 def test_schema_contains_all_dataframe_columns_results_in_unchanged_output( self, caplog: LogCaptureFixture, ) -> None: input_dataframe = DataFrame( { "prediction_id": [str(x) for x in range(self.num_records)], "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), "tag0": ["tag" for _ in range(self.num_records)], } ) input_schema = Schema( prediction_id_column_name="prediction_id", timestamp_column_name="timestamp", feature_column_names=["feature0", "feature1"], tag_column_names=["tag0"], prediction_label_column_name="prediction_label", prediction_score_column_name=None, actual_label_column_name=None, actual_score_column_name=None, ) self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe, expected_parsed_schema=input_schema, should_log_warning_to_user=False, caplog=caplog, ) def test_column_present_in_dataframe_but_missing_from_schema_is_dropped( self, caplog: LogCaptureFixture, ) -> None: input_dataframe = DataFrame( { "prediction_id": [str(x) for x in range(self.num_records)], "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), "tag0": ["tag" for _ in range(self.num_records)], } ) input_schema = Schema( prediction_id_column_name="prediction_id", timestamp_column_name="timestamp", feature_column_names=["feature0", "feature1"], prediction_label_column_name="prediction_label", ) self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ [col for col in input_dataframe.columns if col != "tag0"] ], expected_parsed_schema=replace(input_schema, tag_column_names=None), should_log_warning_to_user=False, caplog=caplog, ) def test_some_features_excluded_removes_excluded_features_columns_and_keeps_the_rest( self, caplog: LogCaptureFixture, ) -> None: input_dataframe = DataFrame( { "prediction_id": [str(x) for x in range(self.num_records)], "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), "tag0": ["tag" for _ in range(self.num_records)], } ) input_schema = Schema( prediction_id_column_name="prediction_id", timestamp_column_name="timestamp", feature_column_names=["feature0", "feature1"], tag_column_names=["tag0"], prediction_label_column_name="prediction_label", excluded_column_names=["feature1"], ) self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ ["prediction_id", "timestamp", "prediction_label", "feature0", "tag0"] ], expected_parsed_schema=replace( input_schema, prediction_label_column_name="prediction_label", feature_column_names=["feature0"], tag_column_names=["tag0"], excluded_column_names=None, ), should_log_warning_to_user=False, caplog=caplog, ) def test_all_features_and_tags_excluded_sets_schema_features_and_tags_fields_to_none( self, caplog: LogCaptureFixture, ) -> None: input_dataframe = DataFrame( { "prediction_id": [str(x) for x in range(self.num_records)], "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), "tag0": ["tag" for _ in range(self.num_records)], } ) excluded_column_names = ["feature0", "feature1", "tag0"] input_schema = Schema( prediction_id_column_name="prediction_id", timestamp_column_name="timestamp", feature_column_names=["feature0", "feature1"], tag_column_names=["tag0"], prediction_label_column_name="prediction_label", excluded_column_names=excluded_column_names, ) self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ ["prediction_id", "timestamp", "prediction_label"] ], expected_parsed_schema=replace( input_schema, prediction_label_column_name="prediction_label", feature_column_names=None, tag_column_names=None, excluded_column_names=None, ), should_log_warning_to_user=False, caplog=caplog, ) def test_excluded_single_column_schema_fields_set_to_none( self, caplog: LogCaptureFixture, ) -> None: input_dataframe = DataFrame( { "prediction_id": [str(x) for x in range(self.num_records)], "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), } ) input_schema = Schema( prediction_id_column_name="prediction_id", timestamp_column_name="timestamp", prediction_label_column_name="prediction_label", feature_column_names=["feature0", "feature1"], excluded_column_names=["prediction_label"], ) self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ ["prediction_id", "timestamp", "feature0", "feature1"] ], expected_parsed_schema=replace( input_schema, prediction_label_column_name=None, excluded_column_names=None, ), should_log_warning_to_user=False, caplog=caplog, ) def test_no_input_schema_features_and_no_excluded_column_names_discovers_features( self, caplog: LogCaptureFixture, ) -> None: input_dataframe = DataFrame( { "prediction_id": [str(x) for x in range(self.num_records)], "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), "feature2": np.ones(self.num_records) + 1, } ) input_schema = Schema( prediction_id_column_name="prediction_id", prediction_label_column_name="prediction_label", timestamp_column_name="timestamp", ) self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe, expected_parsed_schema=replace( input_schema, feature_column_names=["feature0", "feature1", "feature2"] ), should_log_warning_to_user=False, caplog=caplog, ) def test_no_input_schema_features_and_nonempty_excluded_column_names_discovers_features( self, caplog: LogCaptureFixture, ) -> None: input_dataframe = DataFrame( { "prediction_id": [str(x) for x in range(self.num_records)], "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), "feature2": np.ones(self.num_records) + 1, "tag0": ["tag0" for _ in range(self.num_records)], "tag1": ["tag1" for _ in range(self.num_records)], } ) excluded_column_names = ["prediction_label", "feature1", "tag0"] input_schema = Schema( prediction_id_column_name="prediction_id", timestamp_column_name="timestamp", tag_column_names=["tag0", "tag1"], prediction_label_column_name="prediction_label", excluded_column_names=excluded_column_names, ) self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ ["prediction_id", "timestamp", "feature0", "feature2", "tag1"] ], expected_parsed_schema=replace( input_schema, prediction_label_column_name=None, feature_column_names=["feature0", "feature2"], tag_column_names=["tag1"], excluded_column_names=None, ), should_log_warning_to_user=False, caplog=caplog, ) def test_excluded_column_not_contained_in_dataframe_logs_warning( self, caplog: LogCaptureFixture, ) -> None: input_dataframe = DataFrame( { "prediction_id": [str(x) for x in range(self.num_records)], "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), "feature2": np.ones(self.num_records) + 1, "tag0": ["tag0" for _ in range(self.num_records)], "tag1": ["tag1" for _ in range(self.num_records)], } ) excluded_column_names = ["prediction_label", "column_not_in_dataframe"] input_schema = Schema( prediction_id_column_name="prediction_id", timestamp_column_name="timestamp", feature_column_names=["feature0", "feature1", "feature2"], tag_column_names=["tag0", "tag1"], prediction_label_column_name="prediction_label", excluded_column_names=excluded_column_names, ) self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ ["prediction_id", "timestamp", "feature0", "feature1", "feature2", "tag0", "tag1"] ], expected_parsed_schema=replace( input_schema, prediction_label_column_name=None, excluded_column_names=None ), should_log_warning_to_user=True, caplog=caplog, ) def test_schema_includes_embedding_feature_has_all_embedding_columns_included( self, caplog: LogCaptureFixture, ) -> None: input_dataframe = DataFrame( { "prediction_id": [str(x) for x in range(self.num_records)], "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "embedding_vector0": [ np.zeros(self.embedding_dimension) for _ in range(self.num_records) ], "link_to_data0": [f"some-link{index}" for index in range(self.num_records)], "raw_data_column0": [f"some-text{index}" for index in range(self.num_records)], } ) input_schema = Schema( prediction_id_column_name="prediction_id", timestamp_column_name="timestamp", embedding_feature_column_names={ "embedding_feature0": EmbeddingColumnNames( vector_column_name="embedding_vector0", link_to_data_column_name="link_to_data0", raw_data_column_name="raw_data_column0", ), }, ) self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe, expected_parsed_schema=input_schema, should_log_warning_to_user=False, caplog=caplog, ) def test_schema_includes_relationships( self, caplog: LogCaptureFixture, ) -> None: input_dataframe = DataFrame( { "prediction_id": [str(x) for x in range(self.num_records)], "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "embedding": [[1, 2, 3, 4, 5] for _ in range(self.num_records)], "document_ids": [["doc_id_1", "doc_id_4"] for _ in range(self.num_records)], } ) input_schema = Schema( prediction_id_column_name="prediction_id", timestamp_column_name="timestamp", prompt_column_names=RetrievalEmbeddingColumnNames( vector_column_name="embedding", context_retrieval_ids_column_name="document_ids", ), ) self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe, expected_parsed_schema=input_schema, should_log_warning_to_user=False, caplog=caplog, ) def test_embedding_columns_of_excluded_embedding_feature_are_removed( self, caplog: LogCaptureFixture, ) -> None: input_dataframe = DataFrame( { "prediction_id": [str(x) for x in range(self.num_records)], "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "embedding_vector0": [ np.zeros(self.embedding_dimension) for _ in range(self.num_records) ], "link_to_data0": [f"some-link{index}" for index in range(self.num_records)], "raw_data_column0": [f"some-text{index}" for index in range(self.num_records)], "embedding_vector1": [np.zeros(9) for _ in range(self.num_records)], "link_to_data1": [f"some-link{index}" for index in range(self.num_records)], "raw_data_column1": [f"some-text{index}" for index in range(self.num_records)], } ) input_schema = Schema( prediction_id_column_name="prediction_id", timestamp_column_name="timestamp", embedding_feature_column_names={ "embedding_feature0": EmbeddingColumnNames( vector_column_name="embedding_vector0", link_to_data_column_name="link_to_data0", raw_data_column_name="raw_data_column0", ), "embedding_feature1": EmbeddingColumnNames( vector_column_name="embedding_vector1", link_to_data_column_name="link_to_data1", raw_data_column_name="raw_data_column1", ), }, excluded_column_names=["embedding_feature0"], ) self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ [ "prediction_id", "timestamp", "embedding_vector1", "link_to_data1", "raw_data_column1", ] ], expected_parsed_schema=replace( input_schema, embedding_feature_column_names={ "embedding_feature1": EmbeddingColumnNames( vector_column_name="embedding_vector1", link_to_data_column_name="link_to_data1", raw_data_column_name="raw_data_column1", ) }, excluded_column_names=None, ), should_log_warning_to_user=False, caplog=caplog, ) def test_excluding_all_embedding_features_sets_schema_embedding_field_to_none( self, caplog: LogCaptureFixture, ) -> None: input_dataframe = DataFrame( { "prediction_id": [str(x) for x in range(self.num_records)], "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "embedding_vector0": [ np.zeros(self.embedding_dimension) for _ in range(self.num_records) ], "link_to_data0": [f"some-link{index}" for index in range(self.num_records)], "raw_data_column0": [f"some-text{index}" for index in range(self.num_records)], } ) input_schema = Schema( prediction_id_column_name="prediction_id", timestamp_column_name="timestamp", embedding_feature_column_names={ "embedding_feature0": EmbeddingColumnNames( vector_column_name="embedding_vector0", link_to_data_column_name="link_to_data0", raw_data_column_name="raw_data_column0", ), }, excluded_column_names=["embedding_feature0"], ) self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[["prediction_id", "timestamp"]], expected_parsed_schema=replace( input_schema, embedding_feature_column_names=None, excluded_column_names=None, ), should_log_warning_to_user=False, caplog=caplog, ) def test_excluding_an_embedding_column_rather_than_the_embedding_feature_name_logs_warning( self, caplog: LogCaptureFixture, ) -> None: input_dataframe = DataFrame( { "prediction_id": [str(x) for x in range(self.num_records)], "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "embedding_vector0": [ np.zeros(self.embedding_dimension) for _ in range(self.num_records) ], "link_to_data0": [f"some-link{index}" for index in range(self.num_records)], "raw_data_column0": [f"some-text{index}" for index in range(self.num_records)], } ) input_schema = Schema( prediction_id_column_name="prediction_id", timestamp_column_name="timestamp", embedding_feature_column_names={ "embedding_feature0": EmbeddingColumnNames( vector_column_name="embedding_vector0", link_to_data_column_name="link_to_data0", raw_data_column_name="raw_data_column0", ), }, excluded_column_names=["embedding_vector0"], ) self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe, expected_parsed_schema=replace( input_schema, excluded_column_names=None, ), should_log_warning_to_user=True, caplog=caplog, ) def test_excluding_embedding_feature_with_same_name_as_embedding_column_does_not_warn_user( self, caplog: LogCaptureFixture, ) -> None: input_dataframe = DataFrame( { "prediction_id": [str(x) for x in range(self.num_records)], "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "embedding0": [np.zeros(self.embedding_dimension) for _ in range(self.num_records)], "link_to_data0": [f"some-link{index}" for index in range(self.num_records)], "raw_data_column0": [f"some-text{index}" for index in range(self.num_records)], } ) input_schema = Schema( prediction_id_column_name="prediction_id", timestamp_column_name="timestamp", embedding_feature_column_names={ "embedding0": EmbeddingColumnNames( vector_column_name="embedding0", link_to_data_column_name="link_to_data0", raw_data_column_name="raw_data_column0", ), }, excluded_column_names=["embedding0"], ) self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[["prediction_id", "timestamp"]], expected_parsed_schema=replace( input_schema, embedding_feature_column_names=None, excluded_column_names=None, ), should_log_warning_to_user=False, caplog=caplog, ) def test_inferences_coerce_vectors_from_lists_to_arrays( self, caplog: LogCaptureFixture, ) -> None: vec = np.random.random(10) input_dataframe = DataFrame( { "embedding0": [tuple(vec), np.nan] * 20, "embedding1": [list(vec), None] * 20, "embedding2": [list(vec), None, tuple(vec), np.nan] * 10, } ) assert 0 < input_dataframe.isna().sum().sum() < input_dataframe.size emb0 = EmbeddingColumnNames(vector_column_name="embedding0") emb1 = EmbeddingColumnNames(vector_column_name="embedding1") emb2 = EmbeddingColumnNames(vector_column_name="embedding2") input_schema = Schema( embedding_feature_column_names={"embedding0": emb0}, prompt_column_names=emb1, response_column_names=emb2, ) parsed_dataframe, _ = _parse_dataframe_and_schema( dataframe=input_dataframe, schema=input_schema, ) assert len(parsed_dataframe) == len(input_dataframe) for name in ("embedding0", "embedding1", "embedding2"): for parsed, original in zip( parsed_dataframe.loc[:, name], input_dataframe.loc[:, name], ): assert ( parsed is None and original is None or (isinstance(parsed, float) and math.isnan(parsed)) and (isinstance(original, float) and math.isnan(original)) or isinstance(parsed, np.ndarray) and not isinstance(original, np.ndarray) and list(parsed) == list(original) ) def _parse_dataframe_and_schema_and_check_output( self, input_dataframe: DataFrame, input_schema: Schema, expected_parsed_dataframe: DataFrame, expected_parsed_schema: Schema, should_log_warning_to_user: bool, caplog: LogCaptureFixture, ) -> None: parsed_dataframe, parsed_schema = _parse_dataframe_and_schema( dataframe=input_dataframe, schema=input_schema ) assert parsed_dataframe.equals(expected_parsed_dataframe) assert parsed_schema == expected_parsed_schema assert self._warning_logged(caplog) is should_log_warning_to_user @staticmethod def _warning_logged(caplog: LogCaptureFixture) -> bool: """ Scans captured logs to check whether a warning is logged to the user """ for record in caplog.records: if logging.WARNING == record.levelno: return True return False def test_inferences_normalization_columns_already_normalized(self) -> None: input_dataframe = DataFrame( { "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "timestamp": np.full( shape=self.num_records, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp ), "prediction_id": random_uuids(self.num_records), } ) input_schema = Schema( prediction_id_column_name="prediction_id", timestamp_column_name="timestamp", feature_column_names=["feature0"], prediction_label_column_name="prediction_label", ) output_dataframe, _ = _parse_dataframe_and_schema( dataframe=input_dataframe, schema=input_schema ) assert output_dataframe.equals(input_dataframe) def test_inferences_normalization_prediction_id_integer_to_string(self) -> None: input_dataframe = DataFrame( { "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "timestamp": np.full( shape=self.num_records, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp ), "prediction_id": range(self.num_records), } ) input_schema = Schema( prediction_id_column_name="prediction_id", timestamp_column_name="timestamp", feature_column_names=["feature0"], prediction_label_column_name="prediction_label", ) output_dataframe, _ = _parse_dataframe_and_schema( dataframe=input_dataframe, schema=input_schema ) expected_dataframe = input_dataframe expected_dataframe["prediction_id"] = expected_dataframe["prediction_id"].astype(str) assert output_dataframe.equals(expected_dataframe) def test_inferences_normalization_columns_add_missing_prediction_id(self) -> None: input_dataframe = DataFrame( { "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "timestamp": np.full( shape=self.num_records, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp ), } ) input_schema = Schema( timestamp_column_name="timestamp", feature_column_names=["feature0"], prediction_label_column_name="prediction_label", ) output_dataframe, _ = _parse_dataframe_and_schema( dataframe=input_dataframe, schema=input_schema ) assert len(output_dataframe.columns) == 4 assert output_dataframe[["prediction_label", "feature0", "timestamp"]].equals( input_dataframe ) assert "prediction_id" in output_dataframe assert output_dataframe.dtypes["prediction_id"], "string" @property def num_records(self) -> int: return self._NUM_RECORDS @property def embedding_dimension(self) -> int: return self._EMBEDDING_DIMENSION class TestDataset: _NUM_RECORDS = 9 def test_inferences_normalization_columns_already_normalized(self) -> None: input_dataframe = DataFrame( { "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self._NUM_RECORDS), "timestamp": np.full( shape=self._NUM_RECORDS, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp ), "prediction_id": random_uuids(self.num_records), } ) input_schema = Schema( prediction_id_column_name="prediction_id", timestamp_column_name="timestamp", feature_column_names=["feature0"], prediction_label_column_name="prediction_label", ) inferences = Inferences(dataframe=input_dataframe, schema=input_schema) output_dataframe = inferences.dataframe output_schema = inferences.schema assert output_dataframe.equals( input_dataframe.set_index("timestamp", drop=False).sort_index() ) assert output_schema == input_schema # TODO: Move validation tests to validation module; keep one validation integration test def test_inferences_validate_invalid_prediction_id_datatype(self) -> None: input_df = DataFrame( { "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "prediction_id": np.full( shape=self.num_records, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp ), } ) input_schema = Schema( prediction_id_column_name="prediction_id", feature_column_names=["feature0"], prediction_label_column_name="prediction_label", ) with raises_inferences_error( err.InvalidColumnType, "Invalid column types: ['prediction_id should be a string or numeric type']", ): Inferences(dataframe=input_df, schema=input_schema) def test_inferences_validate_invalid_schema_excludes_timestamp(self) -> None: input_df = DataFrame( { "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "timestamp": random_uuids(self.num_records), }, ) input_schema = Schema( timestamp_column_name="timestamp", feature_column_names=["feature0"], prediction_label_column_name="prediction_label", excluded_column_names=["timestamp"], ) with raises_inferences_error( err.InvalidSchemaError, "The schema is invalid: timestamp cannot be excluded " "because it is already being used as the timestamp column.", ): Inferences(dataframe=input_df, schema=input_schema) def test_inferences_validate_invalid_schema_excludes_prediction_id(self) -> None: input_df = DataFrame( { "prediction_id": [str(x) for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "timestamp": random_uuids(self.num_records), }, ) input_schema = Schema( prediction_id_column_name="prediction_id", feature_column_names=["feature0"], prediction_label_column_name="prediction_label", excluded_column_names=["prediction_id"], ) with raises_inferences_error( err.InvalidSchemaError, "The schema is invalid: prediction_id cannot be excluded because it is " "already being used as the prediction id column.", ): Inferences(dataframe=input_df, schema=input_schema) def test_inferences_validate_invalid_schema_missing_column(self) -> None: input_df = DataFrame( { "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "timestamp": random_uuids(self.num_records), }, ) input_schema = Schema( prediction_id_column_name="prediction_id", feature_column_names=["feature0"], prediction_label_column_name="prediction_label", ) with raises_inferences_error( err.MissingColumns, "The following columns are declared in the Schema " "but are not found in the dataframe: prediction_id.", ): Inferences(dataframe=input_df, schema=input_schema) @property def num_records(self) -> int: return self._NUM_RECORDS @contextmanager def raises_inferences_error( validation_error_type: type[BaseException], message: str, ) -> Iterator[None]: with raises(DatasetError) as exc_info: yield assert [type(error) for error in exc_info.value.errors] == [validation_error_type] [error] = exc_info.value.errors assert str(error) == str(exc_info.value) == message def random_uuids(num_records: int) -> list[str]: return [str(uuid.uuid4()) for _ in range(num_records)] @pytest.mark.parametrize( "input_dataframe, input_schema, default_timestamp, expected_dataframe, expected_schema", [ pytest.param( DataFrame( { "prediction_id": [1, 2, 3], } ), Schema(prediction_id_column_name="prediction_id"), Timestamp(year=2022, month=1, day=1, hour=0, minute=0, second=0, tzinfo=pytz.utc), DataFrame( { "prediction_id": [1, 2, 3], "timestamp": Series( [ Timestamp( year=2022, month=1, day=1, hour=0, minute=0, second=0, ) ] * 3 ).dt.tz_localize(pytz.utc), } ), Schema(timestamp_column_name="timestamp", prediction_id_column_name="prediction_id"), id="missing_timestamp_updates_schema_and_adds_default_timestamp_column_to_dataframe", ), pytest.param( DataFrame( { "timestamp": [ Timestamp(year=2022, month=1, day=1, hour=0, minute=0, second=0), Timestamp(year=2022, month=1, day=2, hour=0, minute=0, second=0), Timestamp(year=2022, month=1, day=3, hour=0, minute=0, second=0), ], "prediction_id": [1, 2, 3], } ), Schema(timestamp_column_name="timestamp", prediction_id_column_name="prediction_id"), None, DataFrame( { "timestamp": Series( [ Timestamp( year=2022, month=1, day=1, hour=0, minute=0, second=0, ), Timestamp( year=2022, month=1, day=2, hour=0, minute=0, second=0, ), Timestamp( year=2022, month=1, day=3, hour=0, minute=0, second=0, ), ] ) .dt.tz_localize(datetime.now().astimezone().tzinfo) .dt.tz_convert(pytz.utc), "prediction_id": [1, 2, 3], } ), Schema(timestamp_column_name="timestamp", prediction_id_column_name="prediction_id"), id="tz_naive_timestamps_localized_to_local_time_zone_converted_to_utc", ), pytest.param( DataFrame( { "timestamp": Series( [ Timestamp( year=2022, month=1, day=1, hour=0, minute=0, second=0, ), Timestamp( year=2022, month=1, day=2, hour=1, minute=0, second=0, ), Timestamp( year=2022, month=1, day=3, hour=2, minute=0, second=0, ), ] ).dt.tz_localize(pytz.utc), "prediction_id": [1, 2, 3], } ), Schema(timestamp_column_name="timestamp", prediction_id_column_name="prediction_id"), None, DataFrame( { "timestamp": Series( [ Timestamp( year=2022, month=1, day=1, hour=0, minute=0, second=0, ), Timestamp( year=2022, month=1, day=2, hour=1, minute=0, second=0, ), Timestamp( year=2022, month=1, day=3, hour=2, minute=0, second=0, ), ] ).dt.tz_localize(pytz.utc), "prediction_id": [1, 2, 3], } ), Schema(timestamp_column_name="timestamp", prediction_id_column_name="prediction_id"), id="utc_timestamps_remain_unchanged", ), pytest.param( DataFrame( { "timestamp": Series( [ Timestamp( year=2022, month=1, day=1, hour=0, minute=0, second=0, ), Timestamp( year=2022, month=1, day=2, hour=1, minute=0, second=0, ), Timestamp( year=2022, month=1, day=3, hour=2, minute=0, second=0, ), ] ) .dt.tz_localize(pytz.utc) .apply(lambda dt: float(dt.timestamp())), "prediction_id": [1, 2, 3], } ), Schema(timestamp_column_name="timestamp", prediction_id_column_name="prediction_id"), None, DataFrame( { "timestamp": Series( [ Timestamp( year=2022, month=1, day=1, hour=0, minute=0, second=0, ), Timestamp( year=2022, month=1, day=2, hour=1, minute=0, second=0, ), Timestamp( year=2022, month=1, day=3, hour=2, minute=0, second=0, ), ] ).dt.tz_localize(pytz.utc), "prediction_id": [1, 2, 3], } ), Schema(timestamp_column_name="timestamp", prediction_id_column_name="prediction_id"), id="unix_timestamps_converted_to_utc_timestamps", ), pytest.param( DataFrame( { "timestamp": Series( [ Timestamp( year=2022, month=1, day=1, hour=0, minute=0, second=0, ), Timestamp( year=2022, month=1, day=2, hour=1, minute=0, second=0, ), Timestamp( year=2022, month=1, day=3, hour=2, minute=0, second=0, ), ] ).dt.tz_localize(pytz.timezone("US/Pacific")), "prediction_id": [1, 2, 3], } ), Schema(timestamp_column_name="timestamp", prediction_id_column_name="prediction_id"), None, DataFrame( { "timestamp": Series( [ Timestamp( year=2022, month=1, day=1, hour=8, minute=0, second=0, ), Timestamp( year=2022, month=1, day=2, hour=9, minute=0, second=0, ), Timestamp( year=2022, month=1, day=3, hour=10, minute=0, second=0, ), ] ).dt.tz_localize(pytz.utc), "prediction_id": [1, 2, 3], } ), Schema(timestamp_column_name="timestamp", prediction_id_column_name="prediction_id"), id="us_pacific_timestamps_converted_to_utc", ), pytest.param( DataFrame( { "timestamp": [ "2022-01-01T00:00:00Z", "2022-01-02T01:00:00Z", "2022-01-03T02:00:00Z", ], "prediction_id": [1, 2, 3], } ), Schema(timestamp_column_name="timestamp", prediction_id_column_name="prediction_id"), None, DataFrame( { "timestamp": Series( [ Timestamp( year=2022, month=1, day=1, hour=0, minute=0, second=0, ), Timestamp( year=2022, month=1, day=2, hour=1, minute=0, second=0, ), Timestamp( year=2022, month=1, day=3, hour=2, minute=0, second=0, ), ] ).dt.tz_localize(pytz.utc), "prediction_id": [1, 2, 3], } ), Schema(timestamp_column_name="timestamp", prediction_id_column_name="prediction_id"), id="iso8601_utc_tz_aware_strings_converted_to_utc_timestamps", ), pytest.param( DataFrame( { "timestamp": [ "2022-01-01T00:00:00", "2022-01-02T01:00:00", "2022-01-03T02:00:00", ], "prediction_id": [1, 2, 3], } ), Schema(timestamp_column_name="timestamp", prediction_id_column_name="prediction_id"), None, DataFrame( { "timestamp": Series( [ Timestamp( year=2022, month=1, day=1, hour=0, minute=0, second=0, ), Timestamp( year=2022, month=1, day=2, hour=1, minute=0, second=0, ), Timestamp( year=2022, month=1, day=3, hour=2, minute=0, second=0, ), ] ) .dt.tz_localize(datetime.now().astimezone().tzinfo) .dt.tz_convert(pytz.utc), "prediction_id": [1, 2, 3], } ), Schema(timestamp_column_name="timestamp", prediction_id_column_name="prediction_id"), id="iso8601_tz_naive_strings_localized_to_localTZ_converted_to_utc_timestamps", ), pytest.param( DataFrame( { "timestamp": [ "2022-01-01T00:00:00-08:00", "2022-01-02T01:00:00-08:00", "2022-01-03T02:00:00-08:00", ], "prediction_id": [1, 2, 3], } ), Schema(timestamp_column_name="timestamp", prediction_id_column_name="prediction_id"), None, DataFrame( { "timestamp": Series( [ Timestamp( year=2022, month=1, day=1, hour=8, minute=0, second=0, ), Timestamp( year=2022, month=1, day=2, hour=9, minute=0, second=0, ), Timestamp( year=2022, month=1, day=3, hour=10, minute=0, second=0, ), ] ).dt.tz_localize(pytz.utc), "prediction_id": [1, 2, 3], } ), Schema(timestamp_column_name="timestamp", prediction_id_column_name="prediction_id"), id="iso8601_us_pacific_strings_converted_to_utc_timestamps", ), ], ) def test_normalize_timestamps_produces_expected_output_for_valid_input( input_dataframe: DataFrame, input_schema: Schema, default_timestamp: Timestamp, expected_dataframe: DataFrame, expected_schema: Schema, ) -> None: output_dataframe, output_schema = _normalize_timestamps( dataframe=input_dataframe, schema=input_schema, default_timestamp=default_timestamp ) assert output_schema == expected_schema assert output_dataframe.equals(expected_dataframe) def test_normalize_timestamps_raises_value_error_for_invalid_input() -> None: with pytest.raises(ValueError): _normalize_timestamps( dataframe=DataFrame( { "timestamp": [ "24-03-2023 invalidCharacter 14:15:00", "24-03-2023 invalidCharacter 10:30:00", "24-03-2023 invalidCharacter 18:45:00", ], "prediction_id": [1, 2, 3], } ), schema=Schema( timestamp_column_name="timestamp", prediction_id_column_name="prediction_id" ), default_timestamp=Timestamp.utcnow(), ) def test_inferences_with_arize_schema() -> None: from arize.utils.types import ( # type: ignore[import-untyped] EmbeddingColumnNames as ArizeEmbeddingColumnNames, ) from arize.utils.types import Schema as ArizeSchema input_df = DataFrame( { "prediction_label": ["apple", "orange", "grape"], "prediction_id": ["1", "2", "3"], "timestamp": [ pd.Timestamp(year=2023, month=1, day=1, hour=2, second=30), pd.Timestamp(year=2023, month=1, day=10, hour=6, second=20), pd.Timestamp(year=2023, month=1, day=5, hour=4, second=25), ], "embedding": [[1, 2, 3], [4, 5, 6], [7, 8, 9]], "url": [ "https://www.phoenix.com/apple.png", "https://www.phoenix.com/apple.png", "https://www.phoenix.com/apple.png", ], } ) input_schema = ArizeSchema( prediction_id_column_name="prediction_id", prediction_label_column_name="prediction_label", timestamp_column_name="timestamp", embedding_feature_column_names={ "embedding": ArizeEmbeddingColumnNames( vector_column_name="embedding", link_to_data_column_name="url", ) }, ) inferences = Inferences(dataframe=input_df, schema=input_schema) assert isinstance(inferences.schema, Schema) assert inferences.schema.prediction_id_column_name == "prediction_id" assert (inferences.schema.embedding_feature_column_names or {})[ "embedding" ].vector_column_name == "embedding"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'

If you have feedback or need assistance with the MCP directory API, please join our Discord server