"""Evaluation service implementation."""
import json
import logging
import re
from typing import Dict, List, Optional, Set
import jsonschema
logger = logging.getLogger(__name__)
class EvalService:
"""
Service for evaluating AI responses against expectations.
Supports:
- Regex guards
- JSON Schema validation
- Semantic similarity (token-based Jaccard)
"""
async def score(
self,
actual_text: str,
expected_references: List[str],
expected_metrics: List[str],
guards: List[Dict[str, str]],
run_id: str,
) -> tuple[bool, float, str, List[str], List[str], List[str]]:
"""
Evaluate actual text against expectations.
Args:
actual_text: Text to evaluate
expected_references: Expected reference strings
expected_metrics: Expected metric names
guards: Validation guards (regex, json_schema)
run_id: Scenario run identifier
Returns:
Tuple of (passed, score, message, matched_refs, matched_metrics, failed_guards)
"""
logger.info(f"Evaluating response for run {run_id}")
matched_references = []
matched_metrics = []
failed_guards = []
# Check guards
for guard in guards:
guard_type = guard.get("type", "")
if guard_type == "regex":
pattern = guard.get("pattern", "")
if not re.search(pattern, actual_text, re.IGNORECASE):
failed_guards.append(f"regex:{pattern}")
elif guard_type == "json_schema":
schema_str = guard.get("schema", "{}")
try:
schema = json.loads(schema_str)
# Try to parse actual_text as JSON
try:
data = json.loads(actual_text)
jsonschema.validate(data, schema)
except json.JSONDecodeError:
failed_guards.append("json_schema:invalid_json")
except jsonschema.ValidationError as e:
failed_guards.append(f"json_schema:{e.message}")
except Exception as e:
logger.error(f"Guard validation error: {e}")
failed_guards.append(f"json_schema:error")
# Check references (case-insensitive substring match)
actual_lower = actual_text.lower()
for ref in expected_references:
if ref.lower() in actual_lower:
matched_references.append(ref)
# Check metrics (case-insensitive substring match)
for metric in expected_metrics:
if metric.lower() in actual_lower:
matched_metrics.append(metric)
# Calculate score using token-based Jaccard similarity
score = self._calculate_similarity(
actual_text, expected_references + expected_metrics
)
# Determine pass/fail
ref_ratio = (
len(matched_references) / len(expected_references)
if expected_references
else 1.0
)
metric_ratio = (
len(matched_metrics) / len(expected_metrics)
if expected_metrics
else 1.0
)
passed = (
len(failed_guards) == 0
and ref_ratio >= 0.5 # At least 50% references matched
and metric_ratio >= 0.5 # At least 50% metrics matched
)
message = (
f"Score: {score:.2f}, Refs: {len(matched_references)}/{len(expected_references)}, "
f"Metrics: {len(matched_metrics)}/{len(expected_metrics)}, "
f"Guards: {len(failed_guards)} failed"
)
logger.info(f"Evaluation result: {message}")
return passed, score, message, matched_references, matched_metrics, failed_guards
def _calculate_similarity(self, text: str, expected: List[str]) -> float:
"""
Calculate token-based Jaccard similarity.
Args:
text: Actual text
expected: Expected strings
Returns:
float: Similarity score (0-1)
"""
if not expected:
return 1.0
# Tokenize
actual_tokens = self._tokenize(text)
expected_tokens = set()
for exp in expected:
expected_tokens.update(self._tokenize(exp))
if not actual_tokens or not expected_tokens:
return 0.0
# Jaccard similarity
intersection = actual_tokens.intersection(expected_tokens)
union = actual_tokens.union(expected_tokens)
return len(intersection) / len(union) if union else 0.0
def _tokenize(self, text: str) -> Set[str]:
"""
Tokenize text into lowercase words.
Args:
text: Text to tokenize
Returns:
Set of tokens
"""
# Simple word tokenization
words = re.findall(r"\w+", text.lower())
return set(words)