#!/usr/bin/env python3
"""Fixture scorer for RLM benchmark outputs.
Scores a model/tool output against gold.json emitted by fixture_gen.py.
Supports input as:
- raw text output (file)
- benchmark artifact JSON emitted by bench_tokens_v2.py / bench_openrouter_sweep.py
(expects keys like answer / answer_json)
Emits a JSON report to stdout.
"""
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
_JSON_FENCE_RE = re.compile(r"```(?:json)?\s*(\{.*?\}|\[.*?\])\s*```", re.DOTALL | re.IGNORECASE)
def _extract_first_json(text: str) -> Optional[str]:
"""Best-effort extraction: prefer fenced JSON, else first {...} or [...] block."""
m = _JSON_FENCE_RE.search(text)
if m:
return m.group(1).strip()
# find first object/array by scanning braces depth
for open_ch, close_ch in [("{", "}"), ("[", "]")]:
start = text.find(open_ch)
if start == -1:
continue
depth = 0
for i in range(start, len(text)):
ch = text[i]
if ch == open_ch:
depth += 1
elif ch == close_ch:
depth -= 1
if depth == 0:
candidate = text[start : i + 1].strip()
return candidate
return None
def _load_output(path: Path) -> Tuple[str, Any]:
"""Return (raw_text, parsed_obj_or_None)."""
raw = path.read_text(encoding="utf-8", errors="replace")
# If it's already JSON, try to parse.
try:
obj = json.loads(raw)
return raw, obj
except Exception:
return raw, None
def _normalize_candidate(obj_or_text: Any, raw_text: str) -> Tuple[Optional[Dict[str, Any]], Dict[str, Any]]:
"""Normalize different artifact shapes to the canonical answer object."""
meta: Dict[str, Any] = {}
if isinstance(obj_or_text, dict):
# Bench artifacts often have answer_json already.
if "answer_json" in obj_or_text and isinstance(obj_or_text["answer_json"], (dict, list)):
meta["source"] = "artifact.answer_json"
return obj_or_text["answer_json"], meta
if "answer" in obj_or_text and isinstance(obj_or_text["answer"], str):
extracted = _extract_first_json(obj_or_text["answer"]) or ""
if extracted:
try:
meta["source"] = "artifact.answer(extracted_json)"
return json.loads(extracted), meta
except Exception:
meta["source"] = "artifact.answer(unparseable_json)"
return None, meta
# Raw text path
extracted = _extract_first_json(raw_text)
if not extracted:
meta["source"] = "raw(no_json_found)"
return None, meta
try:
meta["source"] = "raw(extracted_json)"
return json.loads(extracted), meta
except Exception:
meta["source"] = "raw(unparseable_json)"
return None, meta
def _schema_ok(x: Any) -> bool:
if not isinstance(x, dict):
return False
if "env_vars" not in x or "provider_presets" not in x:
return False
if not isinstance(x["env_vars"], list) or not all(isinstance(s, str) for s in x["env_vars"]):
return False
if not isinstance(x["provider_presets"], list):
return False
for p in x["provider_presets"]:
if not isinstance(p, dict):
return False
if "name" not in p or "required_env_vars" not in p:
return False
if not isinstance(p["name"], str):
return False
if not isinstance(p["required_env_vars"], list) or not all(isinstance(s, str) for s in p["required_env_vars"]):
return False
return True
def _f1(tp: int, fp: int, fn: int) -> Dict[str, float]:
prec = tp / (tp + fp) if (tp + fp) else 0.0
rec = tp / (tp + fn) if (tp + fn) else 0.0
f1 = (2 * prec * rec) / (prec + rec) if (prec + rec) else 0.0
return {"precision": prec, "recall": rec, "f1": f1}
def _set_scores(gold: Set[str], pred: Set[str]) -> Dict[str, Any]:
tp = len(gold & pred)
fp = len(pred - gold)
fn = len(gold - pred)
out: Dict[str, Any] = {"tp": tp, "fp": fp, "fn": fn}
out.update(_f1(tp, fp, fn))
out["missing"] = sorted(gold - pred)
out["extra"] = sorted(pred - gold)
return out
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--gold", type=Path, required=True, help="Path to gold.json")
ap.add_argument("--output", type=Path, required=True, help="Path to model output or bench artifact JSON")
args = ap.parse_args()
gold_obj = json.loads(args.gold.read_text(encoding="utf-8"))
gold_env = set(gold_obj.get("env_vars", []))
gold_presets = {p["name"]: set(p.get("required_env_vars", [])) for p in gold_obj.get("provider_presets", [])}
raw_text, parsed = _load_output(args.output)
cand, meta = _normalize_candidate(parsed, raw_text)
report: Dict[str, Any] = {
"parse_ok": 0,
"schema_ok": 0,
"exact_match": 0,
"env_vars": {},
"provider_presets": {},
"meta": meta,
}
if cand is None:
print(json.dumps(report, indent=2, sort_keys=True))
return 0
report["parse_ok"] = 1
report["schema_ok"] = 1 if _schema_ok(cand) else 0
if not report["schema_ok"]:
print(json.dumps(report, indent=2, sort_keys=True))
return 0
pred_env = set(cand.get("env_vars", []))
report["env_vars"] = _set_scores(gold_env, pred_env)
pred_presets_list: List[Dict[str, Any]] = cand.get("provider_presets", [])
pred_presets: Dict[str, Set[str]] = {
p.get("name", ""): set(p.get("required_env_vars", []))
for p in pred_presets_list
if isinstance(p, dict) and isinstance(p.get("name"), str)
}
# preset-name level scoring
report["provider_presets"]["names"] = _set_scores(set(gold_presets.keys()), set(pred_presets.keys()))
# per-preset env var scoring (only where names overlap)
per_name: Dict[str, Any] = {}
ttp = tfp = tfn = 0
for name in sorted(set(gold_presets.keys()) | set(pred_presets.keys())):
g = gold_presets.get(name)
p = pred_presets.get(name)
if g is None:
# extra preset
per_name[name] = {"extra_preset": True, "pred_env_vars": sorted(p or set())}
continue
if p is None:
per_name[name] = {"missing_preset": True, "gold_env_vars": sorted(g)}
continue
s = _set_scores(g, p)
per_name[name] = s
ttp += s["tp"]
tfp += s["fp"]
tfn += s["fn"]
report["provider_presets"]["per_preset"] = per_name
report["provider_presets"]["env_micro"] = {"tp": ttp, "fp": tfp, "fn": tfn, **_f1(ttp, tfp, tfn)}
# exact match: all names match AND all env sets match AND env_vars match
names_ok = report["provider_presets"]["names"]["fp"] == 0 and report["provider_presets"]["names"]["fn"] == 0
env_ok = report["env_vars"]["fp"] == 0 and report["env_vars"]["fn"] == 0
per_ok = True
if names_ok:
for n in gold_presets.keys():
s = per_name.get(n, {})
if s.get("fp") or s.get("fn"):
per_ok = False
break
else:
per_ok = False
report["exact_match"] = 1 if (names_ok and env_ok and per_ok) else 0
print(json.dumps(report, indent=2, sort_keys=True))
return 0
if __name__ == "__main__":
raise SystemExit(main())