metrics-attributes.md•12.8 kB
# Metrics & Attributes
Track custom metrics and attributes during task execution for richer evaluation insights.
## Overview
While executing evaluation tasks, you can record:
- **Metrics** - Numeric values (int/float) for quantitative measurements
- **Attributes** - Any data for qualitative information
These appear in evaluation reports and can be used by evaluators for assessment.
## Recording Metrics
Use [`increment_eval_metric`][pydantic_evals.increment_eval_metric] to track numeric values:
```python
from dataclasses import dataclass
from pydantic_evals.dataset import increment_eval_metric
@dataclass
class APIResult:
output: str
usage: 'Usage'
@dataclass
class Usage:
total_tokens: int
def call_api(inputs: str) -> APIResult:
return APIResult(output=f'Result: {inputs}', usage=Usage(total_tokens=100))
def my_task(inputs: str) -> str:
# Track API calls
increment_eval_metric('api_calls', 1)
result = call_api(inputs)
# Track tokens used
increment_eval_metric('tokens_used', result.usage.total_tokens)
return result.output
```
## Recording Attributes
Use [`set_eval_attribute`][pydantic_evals.set_eval_attribute] to store any data:
```python
from pydantic_evals import set_eval_attribute
def process(inputs: str) -> str:
return f'Processed: {inputs}'
def my_task(inputs: str) -> str:
# Record which model was used
set_eval_attribute('model', 'gpt-4o')
# Record feature flags
set_eval_attribute('used_cache', True)
set_eval_attribute('retry_count', 2)
# Record structured data
set_eval_attribute('config', {
'temperature': 0.7,
'max_tokens': 100,
})
return process(inputs)
```
## Accessing in Evaluators
Metrics and attributes are available in the [`EvaluatorContext`][pydantic_evals.evaluators.EvaluatorContext]:
```python
from dataclasses import dataclass
from pydantic_evals.evaluators import Evaluator, EvaluatorContext
@dataclass
class EfficiencyChecker(Evaluator):
max_api_calls: int = 5
def evaluate(self, ctx: EvaluatorContext) -> dict[str, bool]:
# Access metrics
api_calls = ctx.metrics.get('api_calls', 0)
tokens_used = ctx.metrics.get('tokens_used', 0)
# Access attributes
used_cache = ctx.attributes.get('used_cache', False)
return {
'efficient_api_usage': api_calls <= self.max_api_calls,
'used_caching': used_cache,
'token_efficient': tokens_used < 1000,
}
```
## Viewing in Reports
Metrics and attributes appear in report data:
```python
from pydantic_evals import Case, Dataset
def task(inputs: str) -> str:
return f'Result: {inputs}'
dataset = Dataset(cases=[Case(inputs='test')], evaluators=[])
report = dataset.evaluate_sync(task)
for case in report.cases:
print(f'{case.name}:')
#> Case 1:
print(f' Metrics: {case.metrics}')
#> Metrics: {}
print(f' Attributes: {case.attributes}')
#> Attributes: {}
```
You can also display them in printed reports:
```python
from pydantic_evals import Case, Dataset
def task(inputs: str) -> str:
return f'Result: {inputs}'
dataset = Dataset(cases=[Case(inputs='test')], evaluators=[])
report = dataset.evaluate_sync(task)
# Metrics and attributes are available but not shown by default
# Access them programmatically or via Logfire
for case in report.cases:
print(f'\nCase: {case.name}')
"""
Case: Case 1
"""
print(f'Metrics: {case.metrics}')
#> Metrics: {}
print(f'Attributes: {case.attributes}')
#> Attributes: {}
```
## Automatic Metrics
When using Pydantic AI and Logfire, some metrics are automatically tracked:
```python
import logfire
from pydantic_ai import Agent
logfire.configure(send_to_logfire='if-token-present')
agent = Agent('openai:gpt-4o')
async def ai_task(inputs: str) -> str:
result = await agent.run(inputs)
return result.output
# Automatically tracked metrics:
# - requests: Number of LLM calls
# - input_tokens: Total input tokens
# - output_tokens: Total output tokens
# - prompt_tokens: Prompt tokens (if available)
# - completion_tokens: Completion tokens (if available)
# - cost: Estimated cost (if using genai-prices)
```
Access these in evaluators:
```python
from dataclasses import dataclass
from pydantic_evals.evaluators import Evaluator, EvaluatorContext
@dataclass
class CostChecker(Evaluator):
max_cost: float = 0.01 # $0.01
def evaluate(self, ctx: EvaluatorContext) -> bool:
cost = ctx.metrics.get('cost', 0.0)
return cost <= self.max_cost
```
## Practical Examples
### API Usage Tracking
```python
from dataclasses import dataclass
from pydantic_evals import increment_eval_metric, set_eval_attribute
from pydantic_evals.evaluators import Evaluator, EvaluatorContext
def check_cache(inputs: str) -> str | None:
return None # No cache hit for demo
@dataclass
class APIResult:
text: str
usage: 'Usage'
@dataclass
class Usage:
total_tokens: int
async def call_api(inputs: str) -> APIResult:
return APIResult(text=f'Result: {inputs}', usage=Usage(total_tokens=100))
def save_to_cache(inputs: str, result: str) -> None:
pass # Save to cache
async def smart_task(inputs: str) -> str:
# Try cache first
if cached := check_cache(inputs):
set_eval_attribute('cache_hit', True)
return cached
set_eval_attribute('cache_hit', False)
# Call API
increment_eval_metric('api_calls', 1)
result = await call_api(inputs)
increment_eval_metric('tokens', result.usage.total_tokens)
# Cache result
save_to_cache(inputs, result.text)
return result.text
# Evaluate efficiency
@dataclass
class EfficiencyEvaluator(Evaluator):
def evaluate(self, ctx: EvaluatorContext) -> dict[str, bool | float]:
api_calls = ctx.metrics.get('api_calls', 0)
cache_hit = ctx.attributes.get('cache_hit', False)
return {
'used_cache': cache_hit,
'made_api_call': api_calls > 0,
'efficiency_score': 1.0 if cache_hit else 0.5,
}
```
### Tool Usage Tracking
```python
from dataclasses import dataclass
from pydantic_ai import Agent, RunContext
from pydantic_evals import increment_eval_metric, set_eval_attribute
from pydantic_evals.evaluators import Evaluator, EvaluatorContext
agent = Agent('openai:gpt-4o')
def search(query: str) -> str:
return f'Search results for: {query}'
def call(endpoint: str) -> str:
return f'API response from: {endpoint}'
@agent.tool
def search_database(ctx: RunContext, query: str) -> str:
increment_eval_metric('db_searches', 1)
set_eval_attribute('last_query', query)
return search(query)
@agent.tool
def call_api(ctx: RunContext, endpoint: str) -> str:
increment_eval_metric('api_calls', 1)
set_eval_attribute('last_endpoint', endpoint)
return call(endpoint)
# Evaluate tool usage
@dataclass
class ToolUsageEvaluator(Evaluator):
def evaluate(self, ctx: EvaluatorContext) -> dict[str, bool | int]:
db_searches = ctx.metrics.get('db_searches', 0)
api_calls = ctx.metrics.get('api_calls', 0)
return {
'used_database': db_searches > 0,
'used_api': api_calls > 0,
'tool_call_count': db_searches + api_calls,
'reasonable_tool_usage': (db_searches + api_calls) <= 5,
}
```
### Performance Tracking
```python
import time
from dataclasses import dataclass
from pydantic_evals import increment_eval_metric, set_eval_attribute
from pydantic_evals.evaluators import Evaluator, EvaluatorContext
async def retrieve_context(inputs: str) -> list[str]:
return ['context1', 'context2']
async def generate_response(context: list[str], inputs: str) -> str:
return f'Generated response for {inputs}'
async def monitored_task(inputs: str) -> str:
# Track sub-operation timing
t0 = time.perf_counter()
context = await retrieve_context(inputs)
retrieve_time = time.perf_counter() - t0
increment_eval_metric('retrieve_time', retrieve_time)
t0 = time.perf_counter()
result = await generate_response(context, inputs)
generate_time = time.perf_counter() - t0
increment_eval_metric('generate_time', generate_time)
# Record which operations were needed
set_eval_attribute('needed_retrieval', len(context) > 0)
set_eval_attribute('context_chunks', len(context))
return result
# Evaluate performance
@dataclass
class PerformanceEvaluator(Evaluator):
max_retrieve_time: float = 0.5
max_generate_time: float = 2.0
def evaluate(self, ctx: EvaluatorContext) -> dict[str, bool]:
retrieve_time = ctx.metrics.get('retrieve_time', 0.0)
generate_time = ctx.metrics.get('generate_time', 0.0)
return {
'fast_retrieval': retrieve_time <= self.max_retrieve_time,
'fast_generation': generate_time <= self.max_generate_time,
}
```
### Quality Tracking
```python
from dataclasses import dataclass
from pydantic_evals import set_eval_attribute
from pydantic_evals.evaluators import Evaluator, EvaluatorContext
async def llm_call(inputs: str) -> dict:
return {'text': f'Response: {inputs}', 'confidence': 0.85, 'sources': ['doc1', 'doc2']}
async def quality_task(inputs: str) -> str:
result = await llm_call(inputs)
# Extract quality indicators
confidence = result.get('confidence', 0.0)
sources_used = result.get('sources', [])
set_eval_attribute('confidence', confidence)
set_eval_attribute('source_count', len(sources_used))
set_eval_attribute('sources', sources_used)
return result['text']
# Evaluate based on quality signals
@dataclass
class QualityEvaluator(Evaluator):
min_confidence: float = 0.7
def evaluate(self, ctx: EvaluatorContext) -> dict[str, bool | float]:
confidence = ctx.attributes.get('confidence', 0.0)
source_count = ctx.attributes.get('source_count', 0)
return {
'high_confidence': confidence >= self.min_confidence,
'used_sources': source_count > 0,
'quality_score': confidence * (1.0 + 0.1 * source_count),
}
```
## Metrics vs Attributes vs Metadata
Understanding the differences:
| Feature | Metrics | Attributes | Metadata |
|---------|---------|------------|----------|
| **Set in** | Task execution | Task execution | Case definition |
| **Type** | int, float | Any | Any |
| **Purpose** | Quantitative | Qualitative | Test data |
| **Used for** | Aggregation | Context | Input to task |
| **Available to** | Evaluators | Evaluators | Task & Evaluators |
```python
from pydantic_evals import Case, increment_eval_metric, set_eval_attribute
# Metadata: Defined in case (before execution)
Case(
inputs='question',
metadata={'difficulty': 'hard', 'category': 'math'},
)
# Metrics & Attributes: Recorded during execution
def task(inputs):
# These are recorded during execution
increment_eval_metric('tokens', 100)
set_eval_attribute('model', 'gpt-4o')
return f'Result: {inputs}'
```
## Troubleshooting
### "Metrics/attributes not appearing"
Ensure you're calling the functions inside the task:
```python
from pydantic_evals import increment_eval_metric
def process(inputs: str) -> str:
return f'Processed: {inputs}'
# Bad: Called outside task
increment_eval_metric('count', 1)
def bad_task(inputs):
return process(inputs)
# Good: Called inside task
def good_task(inputs):
increment_eval_metric('count', 1)
return process(inputs)
```
### "Metrics not incrementing"
Check you're using `increment_eval_metric`, not `set_eval_attribute`:
```python
from pydantic_evals import increment_eval_metric, set_eval_attribute
# Bad: This will overwrite, not increment
set_eval_attribute('count', 1)
set_eval_attribute('count', 1) # Still 1
# Good: This increments
increment_eval_metric('count', 1)
increment_eval_metric('count', 1) # Now 2
```
### "Too much data in attributes"
Store summaries, not raw data:
```python
from pydantic_evals import set_eval_attribute
giant_response_object = {'key' + str(i): 'value' * 100 for i in range(1000)}
# Bad: Huge object
set_eval_attribute('full_response', giant_response_object)
# Good: Summary
set_eval_attribute('response_size_kb', len(str(giant_response_object)) / 1024)
set_eval_attribute('response_keys', list(giant_response_object.keys())[:10]) # First 10 keys
```
## Next Steps
- **[Custom Evaluators](../evaluators/custom.md)** - Use metrics/attributes in evaluators
- **[Logfire Integration](logfire-integration.md)** - View metrics in Logfire
- **[Concurrency & Performance](concurrency.md)** - Optimize evaluation performance