from __future__ import annotations
import json
import re
import hashlib
from typing import Any
FENCE_RE = re.compile(r"^\s*```(?:json)?\s*$", re.IGNORECASE)
def sha256_text(s: str) -> str:
return hashlib.sha256(s.encode("utf-8")).hexdigest()
def extract_first_json(text: str) -> Any:
"""
Best-effort extraction of the first JSON object/array from a text blob.
- Strips single markdown fence wrapper if present.
- Finds first '{' or '[' and bracket-matches to the end of that JSON value.
"""
if text is None:
raise ValueError("no text")
s = text.strip()
# unwrap simple fenced block
lines = s.splitlines()
if lines and FENCE_RE.match(lines[0]) and lines[-1].strip().startswith("```"):
s = "\n".join(lines[1:-1]).strip()
# find first opener
start = None
opener = None
for i, ch in enumerate(s):
if ch in "[{":
start = i
opener = ch
break
if start is None:
raise ValueError("no JSON opener found")
closer = "]" if opener == "[" else "}"
depth = 0
in_str = False
esc = False
for j in range(start, len(s)):
c = s[j]
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
else:
if c == '"':
in_str = True
elif c == opener:
depth += 1
elif c == closer:
depth -= 1
if depth == 0:
payload = s[start:j+1]
return json.loads(payload)
raise ValueError("unterminated JSON payload")
def load_answer_from_artifact(path: str) -> tuple[str, Any | None]:
"""
Accepts:
- .txt containing a raw model answer
- .json containing {answer, answer_json} or similar
- directory: tries common filenames in order
Returns (raw_text, parsed_json_or_none).
"""
import os
import json as _json
if os.path.isdir(path):
for name in ("rlm.json", "baseline.json", "result.json", "output.json", "rlm_answer.txt", "baseline_answer.txt", "answer.txt"):
cand = os.path.join(path, name)
if os.path.exists(cand):
return load_answer_from_artifact(cand)
raise FileNotFoundError(f"no known artifact files in dir: {path}")
if path.endswith(".json"):
obj = _json.loads(open(path, "r", encoding="utf-8").read())
# Common locations
if isinstance(obj, dict):
if "answer_json" in obj and obj["answer_json"] is not None:
return (_json.dumps(obj["answer_json"], ensure_ascii=False), obj["answer_json"])
if "answer" in obj and isinstance(obj["answer"], str):
raw = obj["answer"]
try:
parsed = extract_first_json(raw)
return (raw, parsed)
except Exception:
return (raw, None)
# If the JSON file itself is the answer
return (_json.dumps(obj, ensure_ascii=False), obj)
raw = open(path, "r", encoding="utf-8", errors="replace").read()
try:
parsed = extract_first_json(raw)
return (raw, parsed)
except Exception:
return (raw, None)