#!/usr/bin/env python
"""Sweep-benchmark RLM-MCP vs baseline across OpenRouter models.
This script auto-selects "good but inexpensive" OpenRouter models (or uses an
explicit list), then runs the existing v2 benchmark loop for each model.
Requires:
- OPENROUTER_API_KEY set in env
- rlm_mcp_server running locally is *not* required; the script starts an in-process
MCP server exactly like bench_tokens_v2.py.
Why this exists
- openrouter has a large and moving model catalog; hardcoding candidates rots.
- you want comparative curves (quality vs cost vs latency) over a stable task.
Implementation notes
- OpenRouter /models pricing is in USD *per token* (strings), not per 1M.
- If you need billed cost, query /generation with the completion id. This script
can do that, but it is best-effort and may be rate-limited.
"""
from __future__ import annotations
import argparse
import asyncio
import csv
import json
import os
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import requests
# --- OpenRouter API ---
OPENROUTER_MODELS_URL = "https://openrouter.ai/api/v1/models"
OPENROUTER_GENERATION_URL = "https://openrouter.ai/api/v1/generation"
def _float(x: Any, default: float = 0.0) -> float:
try:
return float(x)
except Exception:
return default
@dataclass(frozen=True)
class OrModel:
id: str
context_length: int
prompt_cost_per_tok: float
completion_cost_per_tok: float
@property
def blended_cost_per_tok(self) -> float:
# crude tie-breaker for sorting
return self.prompt_cost_per_tok + self.completion_cost_per_tok
def fetch_openrouter_models(api_key: str) -> List[OrModel]:
r = requests.get(OPENROUTER_MODELS_URL, headers={"Authorization": f"Bearer {api_key}"}, timeout=30)
r.raise_for_status()
data = r.json().get("data", [])
out: List[OrModel] = []
for m in data:
pricing = m.get("pricing") or {}
out.append(
OrModel(
id=m.get("id", ""),
context_length=int(m.get("context_length") or 0),
prompt_cost_per_tok=_float(pricing.get("prompt")),
completion_cost_per_tok=_float(pricing.get("completion")),
)
)
return [m for m in out if m.id]
def pick_candidates(
models: List[OrModel],
*,
min_ctx: int,
max_prompt_cost: float,
max_completion_cost: float,
include_regex: str,
limit: int,
) -> List[OrModel]:
import re
pat = re.compile(include_regex) if include_regex else None
out: List[OrModel] = []
for m in models:
if m.context_length < min_ctx:
continue
if max_prompt_cost > 0 and m.prompt_cost_per_tok > max_prompt_cost:
continue
if max_completion_cost > 0 and m.completion_cost_per_tok > max_completion_cost:
continue
if pat and not pat.search(m.id):
continue
out.append(m)
out.sort(key=lambda x: (x.blended_cost_per_tok, -x.context_length, x.id))
return out[:limit]
def maybe_fetch_billed_cost(api_key: str, completion_id: Optional[str]) -> Optional[float]:
if not completion_id:
return None
# OpenRouter suggests querying /generation for accurate cost. Best-effort here.
try:
r = requests.get(
OPENROUTER_GENERATION_URL,
params={"id": completion_id},
headers={"Authorization": f"Bearer {api_key}"},
timeout=30,
)
if r.status_code != 200:
return None
j = r.json() or {}
data = j.get("data") or {}
cost = data.get("total_cost")
return float(cost) if cost is not None else None
except Exception:
return None
# --- Load existing v2 benchmark helpers dynamically (bench/ isn't a package) ---
def load_bench_v2_module(repo_root: Path):
import importlib.util
p = repo_root / "bench" / "bench_tokens.py"
spec = importlib.util.spec_from_file_location("bench_tokens", p)
if spec is None or spec.loader is None:
raise RuntimeError(f"Could not load {p}")
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod) # type: ignore[assignment]
return mod
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--query", required=True)
ap.add_argument("--globs", nargs="+", required=True)
ap.add_argument("--environment", default="local", choices=["local", "docker"])
ap.add_argument("--dump-dir", required=True, help="Directory to write sweep_results.json and artifacts")
ap.add_argument("--models", default="", help="Comma-separated explicit OpenRouter model ids")
ap.add_argument("--auto", action="store_true", help="Auto-pick candidates from OpenRouter /models")
ap.add_argument("--limit", type=int, default=12)
ap.add_argument("--min-ctx", type=int, default=16384)
ap.add_argument("--include", default=r"(?i)(qwen|deepseek|gemini|claude|gpt|llama).*(coder|instruct|flash)?")
ap.add_argument("--max-prompt-cost", type=float, default=0.0, help="USD per token")
ap.add_argument("--max-completion-cost", type=float, default=0.0, help="USD per token")
ap.add_argument("--other-model", default=None, help="Recursion/subcall model for RLM")
ap.add_argument("--rlm-max-iterations", type=int, default=6)
ap.add_argument("--baseline-max-output-tokens", type=int, default=512)
ap.add_argument("--temperature", type=float, default=0.0)
ap.add_argument("--fetch-billed-cost", action="store_true", help="Query /generation for billed cost (best-effort)")
args = ap.parse_args()
api_key = os.environ.get("OPENROUTER_API_KEY")
if not api_key:
raise SystemExit("OPENROUTER_API_KEY is required")
repo_root = Path(__file__).resolve().parents[1]
mod = load_bench_v2_module(repo_root)
dump_root = Path(args.dump_dir)
dump_root.mkdir(parents=True, exist_ok=True)
# candidate selection
if args.models:
candidates = [m.strip() for m in args.models.split(",") if m.strip()]
else:
if not args.auto:
raise SystemExit("Provide --models or --auto")
catalog = fetch_openrouter_models(api_key)
picked = pick_candidates(
catalog,
min_ctx=args.min_ctx,
max_prompt_cost=args.max_prompt_cost,
max_completion_cost=args.max_completion_cost,
include_regex=args.include,
limit=args.limit,
)
candidates = [m.id for m in picked]
results: List[Dict[str, Any]] = []
# pre-ingest once (shared across models)
ingested_context: str = mod.ingest_context(args.globs)
(dump_root / "ingested_context.json").write_text(
json.dumps({"globs": args.globs, "context": ingested_context}, indent=2),
encoding="utf-8",
)
for model_id in candidates:
run_dir = dump_root / model_id.replace("/", "__")
run_dir.mkdir(parents=True, exist_ok=True)
# baseline
t0 = time.time()
baseline = mod.baseline_call_openai_compatible(
provider_preset="openrouter",
model=model_id,
prompt=ingested_context,
query=args.query,
temperature=args.temperature,
max_output_tokens=args.baseline_max_output_tokens,
)
baseline_wall = time.time() - t0
# RLM MCP
tool_args = {
"query": args.query,
"globs": args.globs,
"provider_preset": "openrouter",
"rlm": {
"model_name": model_id,
"other_model_name": args.other_model,
"timeout_sec": 1200,
"max_iterations": args.rlm_max_iterations,
},
"environment": args.environment,
"output": {"include_metrics": True},
"temperature": args.temperature,
}
t1 = time.time()
rlm_out, structured = asyncio.run(mod.mcp_rlm_call("rlm_mcp_server/server.py", tool_args))
rlm_wall = time.time() - t1
row = {
"model": model_id,
"other_model": args.other_model,
"baseline_wall_sec": baseline_wall,
"baseline_usage": baseline.get("usage"),
"rlm_wall_sec": rlm_wall,
"rlm_metrics": rlm_out.get("metrics"),
"rlm_completion_id": (rlm_out.get("metrics") or {}).get("completion_id"),
}
if args.fetch_billed_cost:
row["billed_cost_usd"] = maybe_fetch_billed_cost(api_key, row.get("rlm_completion_id"))
(run_dir / "baseline.json").write_text(json.dumps(baseline, indent=2), encoding="utf-8")
(run_dir / "rlm.json").write_text(json.dumps({"out": rlm_out, "structured": structured}, indent=2), encoding="utf-8")
results.append(row)
# write summary
(dump_root / "sweep_results.json").write_text(json.dumps(results, indent=2), encoding="utf-8")
# flat CSV
csv_path = dump_root / "sweep_summary.csv"
cols = [
"model",
"other_model",
"baseline_wall_sec",
"baseline_total_tokens",
"baseline_prompt_tokens",
"baseline_completion_tokens",
"rlm_wall_sec",
"rlm_total_tokens",
"rlm_prompt_tokens",
"rlm_completion_tokens",
"billed_cost_usd",
]
with csv_path.open("w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=cols)
w.writeheader()
for r in results:
bu = r.get("baseline_usage") or {}
rm = r.get("rlm_metrics") or {}
tu = (rm.get("token_usage") or {})
w.writerow(
{
"model": r.get("model"),
"other_model": r.get("other_model"),
"baseline_wall_sec": r.get("baseline_wall_sec"),
"baseline_total_tokens": bu.get("total_tokens"),
"baseline_prompt_tokens": bu.get("prompt_tokens"),
"baseline_completion_tokens": bu.get("completion_tokens"),
"rlm_wall_sec": r.get("rlm_wall_sec"),
"rlm_total_tokens": tu.get("total_tokens"),
"rlm_prompt_tokens": tu.get("input_tokens"),
"rlm_completion_tokens": tu.get("output_tokens"),
"billed_cost_usd": r.get("billed_cost_usd"),
}
)
print(f"Wrote: {dump_root / 'sweep_results.json'}")
print(f"Wrote: {csv_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())