Skip to main content
Glama

mcp-run-python

Official
by pydantic
retry-strategies.md13.1 kB
# Retry Strategies Handle transient failures in tasks and evaluators with automatic retry logic. ## Overview LLM-based systems can experience transient failures: - Rate limits - Network timeouts - Temporary API outages - Context length errors Pydantic Evals supports retry configuration for both: - **Task execution** - The function being evaluated - **Evaluator execution** - The evaluators themselves ## Basic Retry Configuration Pass a retry configuration to `evaluate()` or `evaluate_sync()` using [Tenacity](https://tenacity.readthedocs.io/) parameters: ```python from tenacity import stop_after_attempt from pydantic_evals import Case, Dataset def my_function(inputs: str) -> str: return f'Result: {inputs}' dataset = Dataset(cases=[Case(inputs='test')], evaluators=[]) report = dataset.evaluate_sync( task=my_function, retry_task={'stop': stop_after_attempt(3)}, retry_evaluators={'stop': stop_after_attempt(2)}, ) ``` ## Retry Configuration Options Retry configurations use [Tenacity](https://tenacity.readthedocs.io/) and support the same options as Pydantic AI's [`RetryConfig`][pydantic_ai.retries.RetryConfig]: ```python from tenacity import stop_after_attempt, wait_exponential from pydantic_evals import Case, Dataset def my_function(inputs: str) -> str: return f'Result: {inputs}' dataset = Dataset(cases=[Case(inputs='test')], evaluators=[]) retry_config = { 'stop': stop_after_attempt(3), # Stop after 3 attempts 'wait': wait_exponential(multiplier=1, min=1, max=10), # Exponential backoff: 1s, 2s, 4s, 8s (capped at 10s) 'reraise': True, # Re-raise the original exception after exhausting retries } dataset.evaluate_sync( task=my_function, retry_task=retry_config, ) ``` ### Common Parameters The retry configuration accepts any parameters from the tenacity `retry` decorator. Common ones include: | Parameter | Type | Description | |-----------|------|-------------| | `stop` | `StopBaseT` | Stop strategy (e.g., `stop_after_attempt(3)`, `stop_after_delay(60)`) | | `wait` | `WaitBaseT` | Wait strategy (e.g., `wait_exponential()`, `wait_fixed(2)`) | | `retry` | `RetryBaseT` | Retry condition (e.g., `retry_if_exception_type(TimeoutError)`) | | `reraise` | `bool` | Whether to reraise the original exception (default: `False`) | | `before_sleep` | `Callable` | Callback before sleeping between retries | See the [Tenacity documentation](https://tenacity.readthedocs.io/) for all available options. ## Task Retries Retry the task function when it fails: ```python from tenacity import stop_after_attempt, wait_exponential from pydantic_evals import Case, Dataset async def call_llm(inputs: str) -> str: return f'LLM response to: {inputs}' async def flaky_llm_task(inputs: str) -> str: """This might hit rate limits or timeout.""" response = await call_llm(inputs) return response dataset = Dataset(cases=[Case(inputs='test')]) report = dataset.evaluate_sync( task=flaky_llm_task, retry_task={ 'stop': stop_after_attempt(5), # Try up to 5 times 'wait': wait_exponential(multiplier=1, min=1, max=30), # Exponential backoff, capped at 30s 'reraise': True, }, ) ``` ### When Task Retries Trigger Retries trigger when the task raises an exception: ```python class RateLimitError(Exception): pass class ValidationError(Exception): pass async def call_api(inputs: str) -> str: return f'API response: {inputs}' async def my_task(inputs: str) -> str: try: return await call_api(inputs) except RateLimitError: # Will trigger retry raise except ValidationError: # Will also trigger retry raise ``` ### Exponential Backoff When using `wait_exponential()`, delays increase exponentially: ``` Attempt 1: immediate Attempt 2: ~1s delay (multiplier * 2^0) Attempt 3: ~2s delay (multiplier * 2^1) Attempt 4: ~4s delay (multiplier * 2^2) Attempt 5: ~8s delay (multiplier * 2^3, capped at max) ``` The actual delay depends on the `multiplier`, `min`, and `max` parameters passed to `wait_exponential()`. ## Evaluator Retries Retry evaluators when they fail: ```python from tenacity import stop_after_attempt, wait_exponential from pydantic_evals import Case, Dataset from pydantic_evals.evaluators import LLMJudge def my_task(inputs: str) -> str: return f'Result: {inputs}' dataset = Dataset( cases=[Case(inputs='test')], evaluators=[ # LLMJudge might hit rate limits LLMJudge(rubric='Response is accurate'), ], ) report = dataset.evaluate_sync( task=my_task, retry_evaluators={ 'stop': stop_after_attempt(3), 'wait': wait_exponential(multiplier=1, min=0.5, max=10), 'reraise': True, }, ) ``` ### When Evaluator Retries Trigger Retries trigger when an evaluator raises an exception: ```python from dataclasses import dataclass from pydantic_evals.evaluators import Evaluator, EvaluatorContext async def external_api_call(output: str) -> bool: return len(output) > 0 @dataclass class APIEvaluator(Evaluator): async def evaluate(self, ctx: EvaluatorContext) -> bool: # If this raises an exception, retry logic will trigger result = await external_api_call(ctx.output) return result ``` ### Evaluator Failures If an evaluator fails after all retries, it's recorded as an [`EvaluatorFailure`][pydantic_evals.evaluators.EvaluatorFailure]: ```python from tenacity import stop_after_attempt from pydantic_evals import Case, Dataset def task(inputs: str) -> str: return f'Result: {inputs}' dataset = Dataset(cases=[Case(inputs='test')], evaluators=[]) report = dataset.evaluate_sync(task, retry_evaluators={'stop': stop_after_attempt(3)}) # Check for evaluator failures for case in report.cases: if case.evaluator_failures: for failure in case.evaluator_failures: print(f'Evaluator {failure.name} failed: {failure.error_message}') #> (No output - no evaluator failures in this case) ``` View evaluator failures in reports: ```python from pydantic_evals import Case, Dataset def task(inputs: str) -> str: return f'Result: {inputs}' dataset = Dataset(cases=[Case(inputs='test')], evaluators=[]) report = dataset.evaluate_sync(task) report.print(include_evaluator_failures=True) """ Evaluation Summary: task ┏━━━━━━━━━━┳━━━━━━━━━━┓ ┃ Case ID ┃ Duration ┃ ┡━━━━━━━━━━╇━━━━━━━━━━┩ │ Case 1 │ 10ms │ ├──────────┼──────────┤ │ Averages │ 10ms │ └──────────┴──────────┘ """ #> #> ✅ case_0 ━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% (0/0) ``` ## Combining Task and Evaluator Retries You can configure both independently: ```python from tenacity import stop_after_attempt, wait_exponential from pydantic_evals import Case, Dataset def flaky_task(inputs: str) -> str: return f'Result: {inputs}' dataset = Dataset(cases=[Case(inputs='test')], evaluators=[]) report = dataset.evaluate_sync( task=flaky_task, retry_task={ 'stop': stop_after_attempt(5), # Retry task up to 5 times 'wait': wait_exponential(multiplier=1, min=1, max=30), 'reraise': True, }, retry_evaluators={ 'stop': stop_after_attempt(3), # Retry evaluators up to 3 times 'wait': wait_exponential(multiplier=1, min=0.5, max=10), 'reraise': True, }, ) ``` ## Practical Examples ### Rate Limit Handling ```python from tenacity import stop_after_attempt, wait_exponential from pydantic_evals import Case, Dataset from pydantic_evals.evaluators import LLMJudge async def expensive_llm_call(inputs: str) -> str: return f'LLM response: {inputs}' async def llm_task(inputs: str) -> str: """Task that might hit rate limits.""" return await expensive_llm_call(inputs) dataset = Dataset( cases=[Case(inputs='test')], evaluators=[ LLMJudge(rubric='Quality check'), # Also might hit rate limits ], ) # Generous retries for rate limits report = dataset.evaluate_sync( task=llm_task, retry_task={ 'stop': stop_after_attempt(10), # Rate limits can take multiple retries 'wait': wait_exponential(multiplier=2, min=2, max=60), # Start at 2s, exponential up to 60s 'reraise': True, }, retry_evaluators={ 'stop': stop_after_attempt(5), 'wait': wait_exponential(multiplier=2, min=2, max=30), 'reraise': True, }, ) ``` ### Network Timeout Handling ```python import httpx from tenacity import stop_after_attempt, wait_exponential from pydantic_evals import Case, Dataset async def api_task(inputs: str) -> str: """Task that calls external API which might timeout.""" async with httpx.AsyncClient(timeout=10.0) as client: response = await client.post('https://api.example.com', json={'input': inputs}) return response.text dataset = Dataset(cases=[Case(inputs='test')], evaluators=[]) # Quick retries for network issues report = dataset.evaluate_sync( task=api_task, retry_task={ 'stop': stop_after_attempt(4), # A few quick retries 'wait': wait_exponential(multiplier=0.5, min=0.5, max=5), # Fast retry, capped at 5s 'reraise': True, }, ) ``` ### Context Length Handling ```python from tenacity import stop_after_attempt from pydantic_evals import Case, Dataset class ContextLengthError(Exception): pass async def llm_call(inputs: str, max_tokens: int = 8000) -> str: return f'LLM response: {inputs[:100]}' async def smart_llm_task(inputs: str) -> str: """Task that might exceed context length.""" try: return await llm_call(inputs, max_tokens=8000) except ContextLengthError: # Retry with shorter context truncated_inputs = inputs[:4000] return await llm_call(truncated_inputs, max_tokens=4000) dataset = Dataset(cases=[Case(inputs='test')], evaluators=[]) # Don't retry context length errors (handle in task) report = dataset.evaluate_sync( task=smart_llm_task, retry_task={'stop': stop_after_attempt(1)}, # No retries, we handle it ) ``` ## Retry vs Error Handling **Use retries for:** - Transient failures (rate limits, timeouts) - Network issues - Temporary service outages - Recoverable errors **Use error handling for:** - Validation errors - Logic errors - Permanent failures - Expected error conditions ```python class RateLimitError(Exception): pass async def llm_call(inputs: str) -> str: return f'LLM response: {inputs}' def is_valid(result: str) -> bool: return len(result) > 0 async def smart_task(inputs: str) -> str: """Handle expected errors, let retries handle transient failures.""" try: result = await llm_call(inputs) # Validate output (don't retry validation errors) if not is_valid(result): return 'ERROR: Invalid output format' return result except RateLimitError: # Let retry logic handle this raise except ValueError as e: # Don't retry - this is a permanent error return f'ERROR: {e}' ``` ## Troubleshooting ### "Still failing after retries" Increase retry attempts or check if error is retriable: ```python import logging from tenacity import stop_after_attempt from pydantic_evals import Case, Dataset def task(inputs: str) -> str: return f'Result: {inputs}' # Add logging to see what's failing logging.basicConfig(level=logging.DEBUG) dataset = Dataset(cases=[Case(inputs='test')], evaluators=[]) # Tenacity logs retry attempts report = dataset.evaluate_sync(task, retry_task={'stop': stop_after_attempt(5)}) ``` ### "Evaluations taking too long" Reduce retry attempts or wait times: ```python from tenacity import stop_after_attempt, wait_exponential # Faster retries retry_config = { 'stop': stop_after_attempt(3), # Fewer attempts 'wait': wait_exponential(multiplier=0.1, min=0.1, max=2), # Quick retries, capped at 2s 'reraise': True, } ``` ### "Hitting rate limits despite retries" Increase delays or use `max_concurrency`: ```python from tenacity import stop_after_attempt, wait_exponential from pydantic_evals import Case, Dataset def task(inputs: str) -> str: return f'Result: {inputs}' dataset = Dataset(cases=[Case(inputs='test')], evaluators=[]) # Longer delays retry_config = { 'stop': stop_after_attempt(5), 'wait': wait_exponential(multiplier=5, min=5, max=60), # Start at 5s, exponential up to 60s 'reraise': True, } # Also reduce concurrency report = dataset.evaluate_sync( task=task, retry_task=retry_config, max_concurrency=2, # Only 2 concurrent tasks ) ``` ## Next Steps - **[Concurrency & Performance](concurrency.md)** - Optimize evaluation performance - **[Logfire Integration](logfire-integration.md)** - View retries in Logfire

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pydantic/pydantic-ai'

If you have feedback or need assistance with the MCP directory API, please join our Discord server