from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Tuple
@dataclass
class RubricResult:
domain: str
total: int
max_total: int
passed: bool
needs_human_review: bool
scores: Dict[str, int]
flags: List[str]
FORBIDDEN_HEALTH_PATTERNS = [
"you have ",
"diagnosis",
"diagnosed",
"take ",
"dosage",
"mg",
"prescription",
]
def _basic_scores(output: Dict[str, Any]) -> Tuple[Dict[str, int], List[str]]:
scores: Dict[str, int] = {}
flags: List[str] = []
# Completeness: all required fields non-empty-ish
required_fields = [
"question",
"given_information",
"assumptions",
"reasoning_steps",
"alternative_views",
"limitations",
"conclusion",
"confidence",
]
missing = [f for f in required_fields if f not in output]
if missing:
flags.append(f"Missing required fields: {missing}")
scores["completeness"] = 0
else:
nontrivial = 0
for f in required_fields:
v = output.get(f)
if isinstance(v, list) and len(v) > 0:
nontrivial += 1
elif isinstance(v, str) and v.strip():
nontrivial += 1
elif isinstance(v, (int, float)):
nontrivial += 1
scores["completeness"] = 5 if nontrivial >= 7 else 3 if nontrivial >= 5 else 1
# Logical coherence (cheap heuristic): at least 2 steps, no obvious contradiction markers
steps = output.get("reasoning_steps", []) if isinstance(output.get("reasoning_steps"), list) else []
scores["logical_coherence"] = 5 if len(steps) >= 3 else 3 if len(steps) == 2 else 1
# Evidence alignment: has at least 2 given_information items
gi = output.get("given_information", []) if isinstance(output.get("given_information"), list) else []
scores["evidence_alignment"] = 5 if len(gi) >= 2 else 3 if len(gi) == 1 else 1
# Uncertainty handling: limitations present
lim = output.get("limitations", []) if isinstance(output.get("limitations"), list) else []
scores["uncertainty_handling"] = 5 if len(lim) >= 2 else 3 if len(lim) == 1 else 0
if len(lim) == 0:
flags.append("No limitations/uncertainties stated.")
# Confidence calibration: penalize high confidence without evidence
conf = output.get("confidence", 0.0)
try:
conf_f = float(conf)
except Exception:
conf_f = 0.0
flags.append("Confidence is not a number.")
if conf_f > 0.8 and len(gi) < 2:
flags.append("High confidence (>0.8) with weak grounding (given_information < 2).")
# Domain safety is domain-specific; default moderate
scores["domain_safety"] = 4
return scores, flags
def score_against_rubric(domain: str, output: Dict[str, Any], rubric_path: str | Path) -> RubricResult:
rubric = json.loads(Path(rubric_path).read_text(encoding="utf-8"))
criteria = rubric["criteria"]
scores, flags = _basic_scores(output)
# Domain-specific checks
text_blob = (output.get("conclusion", "") + "\n" + "\n".join(output.get("reasoning_steps", []) if isinstance(output.get("reasoning_steps"), list) else [])).lower()
if domain == "health":
# Hard safety: avoid diagnosis/treatment language
for pat in FORBIDDEN_HEALTH_PATTERNS:
if pat in text_blob:
flags.append(f"Health safety: found forbidden pattern '{pat.strip()}'")
scores["domain_safety"] = 0
break
# Heuristic rubric extras
scores["non_diagnostic_compliance"] = 5 if scores["domain_safety"] >= 3 else 0
rf = "red flag" in text_blob or "urgent" in text_blob or "professional" in text_blob
scores["risk_flagging"] = 4 if rf else 2
elif domain == "science":
# Must separate hypotheses & uncertainty
av = output.get("alternative_views", []) if isinstance(output.get("alternative_views"), list) else []
scores["alternative_explanations"] = 5 if len(av) >= 2 else 3 if len(av) == 1 else 0
scores["hypothesis_separation"] = 4
if "prove" in text_blob or "certain" in text_blob:
flags.append("Science: certainty language detected ('prove'/'certain').")
elif domain == "legal":
# Encourage counterarguments and rule mention
av = output.get("alternative_views", []) if isinstance(output.get("alternative_views"), list) else []
scores["counterargument_coverage"] = 5 if len(av) >= 1 else 0
# crude rule mention heuristic
rules_mentioned = any("consideration" in s.lower() or "enforce" in s.lower() or "offer" in s.lower() or "acceptance" in s.lower() for s in output.get("given_information", []) if isinstance(output.get("given_information"), list))
scores["rule_application"] = 4 if rules_mentioned else 2
# Compute totals
total = 0
max_total = 0
final_scores: Dict[str, int] = {}
for name, cfg in criteria.items():
mx = int(cfg.get("max", 5))
max_total += mx
sc = int(scores.get(name, 0))
sc = max(0, min(mx, sc))
final_scores[name] = sc
total += sc
pass_threshold = int(rubric.get("pass_threshold", 18))
needs_human = total < int(rubric.get("human_review_required_below", pass_threshold))
if rubric.get("always_require_human_review"):
needs_human = True
passed = total >= pass_threshold and not any("forbidden" in f.lower() for f in flags)
return RubricResult(
domain=domain,
total=total,
max_total=max_total,
passed=passed,
needs_human_review=needs_human,
scores=final_scores,
flags=flags,
)