---
description: Always follow these rules when creating or modifying Tecton unit tests
globs:
alwaysApply: false
---
# Rules to follow when you create Tecton FeatureView unit tests
CRITICAL: Before adding a new unit test, make sure that `tecton plan` works. This ensures the Feature View definitions are at least valid.
Feature retrieval methods such as `get_features_in_range` and
`run_transformation` use _validated_ Tecton Objects during retrieval by
default. Validation requires access to the Tecton API and a Compute Cluster
which are often unavailable in Unit Testing environments.
To use these methods using mock inputs **without validation** in Unit Tests, set the validation mode to `skip` using:
```
conf.set("TECTON_SKIP_OBJECT_VALIDATION", "True")
```
In your unit test for a given FeatureView, always fetch a reference to the FeatureView using the `TestRepo` test fixture. Never try to import it directly from the module. Sample code that shows how to use `repo_fixture.get_feature_view` to retrieve the unit under test:
```python
### tests/transaction_amount_is_high.py ###
from tecton import TestRepo
import pandas
# Testing the 'transaction_amount_is_high' feature which depends on request data ('amount') as input
def test_transaction_amount_is_high(repo_fixture: TestRepo):
transaction_amount_is_high = repo_fixture.get_feature_view("transaction_amount_is_high")
transaction_request = pandas.DataFrame({"amount": [124, 10001, 34235436234]})
# Use a MockContext to mock any secrets or resources
mock_context = MockContext(secrets={"my_secret": "my_secret_value"})
actual = transaction_amount_is_high.run_transformation(
input_data={
"transaction_request": transaction_request,
"context": mock_context,
},
).to_pandas()
expected = pandas.DataFrame({"transaction_amount_is_high": [0, 1, 1]})
pandas.testing.assert_frame_equal(actual, expected)
```
## Overview
Tecton supports unit testing for feature definitions to ensure the correctness
of feature definitions. Unit tests can be defined in feature repositories in
file paths matching the pattern `**/tests/*.py`.
## Running Tests
- CRITICAL: Always run your unit tests with tecton test from the root of the feature directory. Do NOT invoke pytest directly.
## CRITICAL: Test Method Selection
### Use `get_features_for_events` when:
- Testing features that use Tecton's Aggregation Engine (e.g., `@batch_feature_view` that have `Aggregate` features)
### Use `run_transformation` when:
- Testing custom SQL transformations or derived metrics that don't use `Aggregate` features. It's important that you NEVER use run_transformation for FeatureViews that declare `Aggregate` features.
- Verifying raw transformation output
- Testing Realtime Feature Views
`run_transformation` supports the following parameters for batch and streaming feature views:
- start_time
- end_time
- mock_inputs: dictionary that maps strings (named after the FV function parameters) to dataframes. One for every source of the FV
It's critical that the difference between the start_time and the end_time you're passing in matches the `batch_schedule` you've defined on the FeatureView. For example, if batch_schedule is set to 1 day, then you must ensure that the start_time is exactly 1d prior to the end_time.
`run_transformation` supports the following parameters for Realtime Feature Views:
- input_data: A dictionary that maps strings (named after the FV function parameters) to objects you want to pass in
`run_transformation` returns an instance of `TectonDataFrame`. You can call `to_pandas()` on it to turn it into a Pandas DF.
## Examples of RealtimeFeatureView Tests
Consider a Feature View that determines if a transaction amount is
high:
```python
from tecton import RequestSource, realtime_feature_view, Attribute
from tecton.types import Float64, Int64
import pandas
# Define the request schema
transaction_request = RequestSource(schema=[Field("amount", Float64)])
# Define the output features
output_features = [Attribute("transaction_amount_is_high", Int64)]
# This Realtime Feature View evaluates a transaction amount and declares it as "high", if it's higher than 10,000
@realtime_feature_view(
sources=[transaction_request],
mode="pandas",
features=output_features,
description="Whether the transaction amount is considered high (over $10000)",
secrets={"my_secret": Secret(scope="my_scope", key="my_key")},
)
def transaction_amount_is_high(transaction_request: pandas.DataFrame, context):
import pandas as pd
# Access the Tecton Secret from the RealtimeContext
my_secret = context.secrets["my_secret"]
df = pd.DataFrame()
df["transaction_amount_is_high"] = (transaction_request["amount"] >= 10000).astype("int64")
return df
```
With the above Feature View, we can define the unit test that mocks up the
necessary input data, secrets and resources and asserts that we're getting the
expected result. The `MockContext` class can be used to mock any
Secrets or
Resources
that are required by the Feature View.
```python
### tests/transaction_amount_is_high.py ###
from tecton import TestRepo
import pandas
# Testing the 'transaction_amount_is_high' feature which depends on request data ('amount') as input
def test_transaction_amount_is_high(repo_fixture: TestRepo):
transaction_amount_is_high = repo_fixture.get_feature_view("transaction_amount_is_high")
transaction_request = pandas.DataFrame({"amount": [124, 10001, 34235436234]})
# Use a MockContext to mock any secrets or resources
mock_context = MockContext(secrets={"my_secret": "my_secret_value"})
actual = transaction_amount_is_high.run_transformation(
input_data={
"transaction_request": transaction_request,
"context": mock_context,
},
).to_pandas()
expected = pandas.DataFrame({"transaction_amount_is_high": [0, 1, 1]})
pandas.testing.assert_frame_equal(actual, expected)
```
## Examples of Spark-based Feature Views Unit Tests
Creating a unit test in a PySpark or Spark SQL feature view is similar to the
above example, except that we provide a `SparkSession` in the test code.
For example, consider a Feature View that determines if a user has good credit:
```python
### user_has_good_credit.py ###
from tecton import batch_feature_view, Attribute
from tecton.types import Bool
from fraud.entities import user
from fraud.data_sources.credit_scores_batch import credit_scores_batch
from datetime import datetime, timedelta
@batch_feature_view(
sources=[credit_scores_batch],
entities=[user],
timestamp_field="timestamp",
features=[Attribute("user_has_good_credit", Bool)],
mode="spark_sql",
online=True,
offline=True,
feature_start_time=datetime(2021, 1, 1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=120),
)
def user_has_good_credit(credit_scores):
return f"""
SELECT
user_id,
credit_score > 670 as user_has_good_credit,
timestamp
FROM
{credit_scores}
"""
```
Because this is a Spark SQL Feature View, we'll need a `SparkSession` to test.
Tecton provides the `tecton_pytest_spark_session` pytest
@fixture. This fixture creates a
`SparkSession`.
Finally, we can define the actual unit test that mocks up some sample inputs,
and asserts that we're getting the expected result.
You should ensure that the mock data schema exactly matches the source schema.
Any `datetime` partition columns that may be present need to match, too.
Example of a unit test
```python
def test_user_credit_card_issuer_ghf(tecton_pytest_spark_session):
input_pandas_df = pandas.DataFrame(
{
"user_id": ["user_1", "user_2", "user_3", "user_4"],
"signup_timestamp": [datetime(2022, 5, 1)] * 4,
"cc_num": [1000000000000000, 4000000000000000, 5000000000000000, 6000000000000000],
}
)
input_spark_df = tecton_pytest_spark_session.createDataFrame(input_pandas_df)
events = pandas.DataFrame(
{
"user_id": ["user_1", "user_1", "user_2", "user_not_found"],
"timestamp": [datetime(2022, 5, 1), datetime(2022, 5, 2), datetime(2022, 6, 1), datetime(2022, 6, 1)],
}
)
# Simulate materializing features for May 1st.
output = user_credit_card_issuer.get_features_for_events(events, mock_inputs={"fraud_users_batch": input_spark_df})
actual = output.to_pandas()
expected = pandas.DataFrame(
{
"user_id": ["user_1", "user_1", "user_2", "user_not_found"],
"timestamp": [datetime(2022, 5, 1), datetime(2022, 5, 2), datetime(2022, 6, 1), datetime(2022, 6, 1)],
"user_credit_card_issuer__credit_card_issuer": [None, "other", "Visa", None],
}
)
# NOTE: because the Spark join has non-deterministic ordering, it is important to
# sort the dataframe to avoid test flakes.
actual = actual.sort_values(["user_id", "timestamp"]).reset_index(drop=True)
expected = expected.sort_values(["user_id", "timestamp"]).reset_index(drop=True)
pandas.testing.assert_frame_equal(actual, expected)
```
### Configure the Local Test Spark Session
Tecton provides a Pytest session-scoped `tecton_pytest_spark_session` fixture.
However, that Spark session may not be configured correctly for your tests. In
that case, you may either configure the Tecton-provided fixture or create your
own Spark session.
Here's an example of configuring the Tecton-provided Spark session:
```python
import pytest
@pytest.fixture(scope="module", autouse=True)
def configure_spark_session(tecton_pytest_spark_session):
# Custom configuration for the spark session.
tecton_pytest_spark_session.conf.set("spark.sql.session.timeZone", "UTC")
```
Here's an example of how to create your own Spark session and provide it to
Tecton:
```python
from importlib import resources
@pytest.fixture(scope="session")
def my_custom_spark_session():
"""Returns a custom spark session configured for use in Tecton unit testing."""
with resources.path("tecton_spark.jars", "tecton-udfs-spark-3.jar") as path:
tecton_udf_jar_path = str(path)
spark = (
SparkSession.builder.appName("my_custom_spark_session")
.config("spark.jars", tecton_udf_jar_path)
# This short-circuit's Spark's attempt to auto-detect a hostname for the master address, which can lead to
# errors on hosts with "unusual" hostnames that Spark believes are invalid.
.config("spark.driver.host", "localhost")
.config("spark.sql.session.timeZone", "UTC")
.getOrCreate()
)
try:
tecton.set_tecton_spark_session(spark)
yield spark
finally:
spark.stop()
```
## Test Structure
### 1. Test Data Setup
- Start with a clear docstring describing what aspect is being tested
- Create minimal but complete mock input data
- Document purpose of each test record with comments
- Include edge cases in test data (nulls, zeros, boundaries)
### 2. Expected Values
- Calculate expected values manually first
- Document calculations in comments above assertions
- Break down complex calculations into steps
- Consider edge cases and their expected outcomes
### 3. Feature Computation
- Choose appropriate test method (get_features_for_events vs run_transformation)
- Set up correct time windows for feature retrieval
- Ensure all required mock inputs are provided
- Consider batch schedule constraints
### 4. Assertions
- Group assertions by feature type (time-based, basic metrics, derived metrics)
- Test both column presence and values
- Include descriptive error messages
- Use appropriate tolerance for floating-point comparisons
- Test NULL values explicitly where expected
## Best Practices
### Time Window Handling
- Document the time window being tested
- Test window boundary conditions
- Consider timezone handling if relevant
### Error Messages
- Include context about what's being tested
- Show calculation of expected values
- Reference specific features being tested
- Use f-strings for formatting
- Include actual vs expected values
## Common Pitfalls to Avoid
- Using PySpark calculations for expected values
- Not documenting the reason for NULL expectations
- Missing edge cases in test data
- Not accounting for time window boundaries
- Using exact equality for floating-point comparisons
- Not testing both presence and values of features
- Insufficient error messages in assertions
- Not considering batch schedule constraints
- Don't just give up and end up skipping a unit test you've created. Fix it! If you don't know how, ask the user for help
- Don't ever try to use the `get_historical_features` method. It's deprecated
## How to debug a unit test
If your unit test isn't returning the feature values you expect, try the following techniques:
- If the FeatureView is defined using SQL, print the fully assembled SQL query. Analyze the query to ensure it aligns with your expectations.
- A common issue is unintended filtering—especially on time ranges—which can silently exclude expected data.
- If reviewing the SQL isn’t enough, inspect the mock data provided to the unit test:
- The parameters passed into your FeatureView function typically include temporary table names
- You can query those temporary tables directly to see what data is available during the test.
- If the temporary tables show less data than you'd expect, a common root cause is that Tecton filters the input data by timestamp. It filters out anything that's not within the start_time and end_time timerange unless you explicitly call ".unfiltered()" on the data source when you reference it in the FV definition's `sources` parameter.
Example:
```python
@batch_feature_view(
sources=[transactions_snowflake],
# ...
)
def feature_view_name(transactions, context=materialization_context()):
# Keep explicit ISO formatting for timestamp
end_time_iso = context.end_time.isoformat()
# Define the SQL f-string, using single braces for source as per docs
sql_query = f"""
SELECT
user_id,
AVG(amount) AS avg_transaction_amount_30d,
TO_TIMESTAMP('{end_time_iso}') - INTERVAL 1 MICROSECOND AS feature_timestamp
FROM {transactions}
WHERE transaction_time >= (TO_TIMESTAMP('{end_time_iso}') - INTERVAL 30 DAY)
AND transaction_time < TO_TIMESTAMP('{end_time_iso}')
GROUP BY
user_id
"""
print(sql_query)
from pyspark.sql import SparkSession
# Get the current Spark session
spark = SparkSession.builder.getOrCreate()
df = spark.sql(f"SELECT * FROM {transactions}")
# Show the data
df.show(truncate=False)
return sql_query
```
Example:
```python
from pyspark.sql import SparkSession
# Get the current Spark session
spark = SparkSession.builder.getOrCreate()
# Replace 'temp_table_name' with the actual name passed into your function
df = spark.sql("SELECT * FROM temp_table_name")
# Show the data
df.show(truncate=False)
```
## More Examples of unit tests
### Example of a unit test for a batch feature view that uses incremental_materialization=True
```python
import pytest
import pandas as pd
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
from tecton import conf, TestRepo, FeatureView, MaterializationContext
# Remove direct import of the feature view
# from new_feature_o1 import product_store_performance_features, transactions_src, products_src, stores_src
# Configure Tecton to skip online validation during tests
conf.set("TECTON_SKIP_OBJECT_VALIDATION", "True")
# Helper function to compare Spark DataFrames (handles potential ordering issues)
# Note: For more robust comparison, consider libraries like chispa or detailed checks.
def assert_spark_df_equal(actual_df, expected_df):
""" Basic comparison of Spark DataFrames by converting to Pandas """
# Sort by key columns to ensure order doesn't affect comparison
# Adjust key columns based on your entity join keys
key_cols = ['product_category', 'store_region']
actual_pdf = actual_df.sort(key_cols).toPandas()
expected_pdf = expected_df.sort(key_cols).toPandas()
# Basic schema check (column names)
assert sorted(actual_pdf.columns) == sorted(expected_pdf.columns), \
f"Column mismatch: {sorted(actual_pdf.columns)} vs {sorted(expected_pdf.columns)}"
# Reorder columns for consistent comparison
expected_pdf = expected_pdf[actual_pdf.columns]
pd.testing.assert_frame_equal(actual_pdf, expected_pdf, check_dtype=False, rtol=1e-5)
# Use the standard Tecton spark session fixture
@pytest.fixture(scope='module')
def spark(tecton_pytest_spark_session: SparkSession) -> SparkSession:
""" Provides the Tecton SparkSession fixture. """
# Optional: Add custom Spark confs if needed
# tecton_pytest_spark_session.conf.set("spark.sql.session.timeZone", "UTC")
return tecton_pytest_spark_session
# Mock MaterializationContext for testing (Using Tecton's class is better if available/needed)
# For run_transformation, just passing the context values might suffice.
# If Tecton's MaterializationContext is needed for complex scenarios, import it.
# class MockMaterializationContext:
# def __init__(self, end_time):
# # Ensure end_time is a string in the expected format for SQL interpolation
# if isinstance(end_time, datetime):
# self.end_time = end_time.strftime("%Y-%m-%d %H:%M:%S.%f")
# else:
# self.end_time = end_time
def test_product_store_performance_features(repo_fixture: TestRepo, spark: SparkSession):
""" Tests the product_store_performance_features batch feature view transformation using run_transformation. """
# Get the FeatureView object from the repository fixture
fv_under_test: FeatureView = repo_fixture.get_feature_view("product_store_performance_features")
# Define the context end time for the test run
# test_end_time = datetime(2023, 1, 1, 0, 0, 0)
# Create a context object suitable for run_transformation (might just need dict)
# test_context = MaterializationContext(end_time=test_end_time) # Removed: Context likely handled differently
# --- 1. Define Sample Input Data (using the same data as before) ---
transactions_data = {
'transaction_id': [1, 2, 3, 4, 5, 6],
'product_id': [101, 101, 102, 103, 102, 101],
'store_id': [1, 2, 1, 1, 2, 1],
'transaction_amount': [10.0, 15.0, 20.0, 5.0, 25.0, 12.0],
'TRANSACTION_TIMESTAMP': [
datetime(2022, 6, 15), datetime(2022, 7, 1), datetime(2022, 1, 10),
datetime(2022, 11, 5), datetime(2022, 12, 20), datetime(2021, 12, 30)
],
'transaction_status': ['completed', 'completed', 'completed', 'pending', 'completed', 'completed']
}
transactions_pdf = pd.DataFrame(transactions_data)
transactions_pdf['TRANSACTION_TIMESTAMP'] = pd.to_datetime(transactions_pdf['TRANSACTION_TIMESTAMP'])
products_data = {
'product_id': [101, 102, 103],
'product_category': ['Electronics', 'Home Goods', 'Electronics']
}
products_pdf = pd.DataFrame(products_data)
stores_data = {
'store_id': [1, 2],
'store_region': ['North', 'South']
}
stores_pdf = pd.DataFrame(stores_data)
# Convert Pandas DFs to Spark DFs
transactions_df = spark.createDataFrame(transactions_pdf)
products_df = spark.createDataFrame(products_pdf)
stores_df = spark.createDataFrame(stores_pdf)
# Register mock DFs as temp views so the SQL query can find them
transactions_df.createOrReplaceTempView("transactions")
products_df.createOrReplaceTempView("products")
stores_df.createOrReplaceTempView("stores")
# --- 2. Define Expected Output (using the same calculated data) ---
# Need to define test_end_time here for expected output calculation
test_end_time = datetime(2023, 1, 1, 0, 0, 0)
test_start_time = test_end_time - timedelta(days=1) # Define a start_time for the run
test_feature_timestamp = test_end_time - timedelta(microseconds=1)
expected_output_data = {
'product_category': ['Electronics', 'Electronics', 'Home Goods', 'Home Goods' ],
'store_region': ['North', 'South', 'North', 'South' ],
'transaction_count': [1, 1, 1, 1 ],
'total_revenue': [10.0, 15.0, 20.0, 25.0 ],
'day_span': [0, 0, 0, 0 ],
'active_days': [1, 1, 1, 1 ],
'activity_density': [None, None, None, None ],
'daily_revenue_rate': [None, None, None, None ],
'transactions_per_active_day': [1.0, 1.0, 1.0, 1.0 ],
'category_revenue_share': [10.0/25.0, 15.0/25.0, 20.0/45.0, 25.0/45.0 ],
'category_transaction_share': [1.0/2.0, 1.0/2.0, 1.0/2.0, 1.0/2.0 ],
'region_revenue_share': [10.0/30.0, 15.0/40.0, 20.0/30.0, 25.0/40.0 ],
'transaction_amount_stddev': [None, None, None, None ],
'transaction_amount_variance': [None, None, None, None ],
'days_since_last_transaction': [200, 184, 356, 12 ],
'FEATURE_TIMESTAMP': [test_feature_timestamp] * 4
}
expected_output_pdf = pd.DataFrame(expected_output_data)
expected_output_pdf['FEATURE_TIMESTAMP'] = pd.to_datetime(expected_output_pdf['FEATURE_TIMESTAMP'])
float_cols = ['total_revenue', 'activity_density', 'daily_revenue_rate', 'transactions_per_active_day',
'category_revenue_share', 'category_transaction_share', 'region_revenue_share',
'transaction_amount_stddev', 'transaction_amount_variance']
for col in float_cols:
if col in expected_output_pdf.columns:
expected_output_pdf[col] = expected_output_pdf[col].astype(float)
expected_output_df = spark.createDataFrame(expected_output_pdf)
# --- 3. Run Transformation using run_transformation ---
# Map source *parameter names* (from the function definition) to the mock Spark DFs
mock_inputs_dict = {
# Use the parameter names of the BFV function
"transactions": transactions_df,
"products": products_df,
"stores": stores_df,
}
actual_output_tecton_df = fv_under_test.run_transformation(
mock_inputs=mock_inputs_dict,
# Removed context parameter: context=test_context
# Need to specify the end_time for the run
start_time=test_start_time, # Added start_time
end_time=test_end_time
)
actual_output_df = actual_output_tecton_df.to_spark()
# --- 4. Compare Actual vs Expected ---
assert_spark_df_equal(actual_output_df, expected_output_df)
```