get_spec_history
Returns archived snapshots with trend deltas (7-day, 30-day) for spec metrics to show improvement over time.
Instructions
Return the last N snapshots archived by get_optimization_plan plus trend deltas (current vs ~7 days ago, vs ~30 days ago) for spec count, untested, quality findings, drift, stranded, and unknown-hash specs. Use when a user asks 'are we improving' / 'show me the trend' / 'how did we do this month'. Requires at least 2 snapshots for trend; degrades gracefully with fewer. Returns {snapshots_total, snapshots[], trend[], markdown}.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| limit | No |
Implementation Reference
- The core handler function `get_spec_history_tool` that executes the tool logic: reads snapshots from HISTORY_DIR, computes trend deltas (vs 7 days ago and 30 days ago), and returns markdown-formatted results.
def get_spec_history_tool(arguments: dict) -> dict[str, Any]: """Return the last N snapshots + trend deltas (current vs ~7 days ago and ~30 days ago when those windows have data). Args: limit: int, default 10 — how many of the latest snapshots to return. """ limit = int(arguments.get("limit", 10)) snapshots = _read_snapshots() if not snapshots: return { "snapshots_total": 0, "snapshots": [], "trend": None, "markdown": "# Spec history\n\n_No snapshots yet — run `get_optimization_plan` to start tracking._", } recent = snapshots[-limit:] latest = snapshots[-1] now = _now_dt() def _pick(days_ago: int) -> dict | None: target = now - _dt.timedelta(days=days_ago) best = None best_gap = None for snap in snapshots: try: ts = _dt.datetime.strptime(snap["timestamp"], "%Y-%m-%dT%H-%M-%SZ").replace(tzinfo=_dt.timezone.utc) except (KeyError, ValueError): continue gap = abs((ts - target).total_seconds()) if best is None or gap < best_gap: best = snap best_gap = gap # Only return if best is within 1.5x the target window (avoid using # day-0 snapshot as the "30 days ago" baseline). if best is None or best_gap > days_ago * 86400 * 1.5: return None return best baseline_7d = _pick(7) baseline_30d = _pick(30) def _row(field: str, label: str) -> dict: return { "field": field, "label": label, "current": latest.get(field, 0), "vs_7d": _delta(latest.get(field, 0), baseline_7d[field]) if baseline_7d and field in baseline_7d else "—", "vs_30d": _delta(latest.get(field, 0), baseline_30d[field]) if baseline_30d and field in baseline_30d else "—", } trend = [ _row("specs_total", "Specs tracked"), _row("untested_count", "Untested specs"), _row("quality_findings", "Quality findings"), _row("drifted_count", "Drifted specs"), _row("stranded_count", "Stranded specs"), _row("unknown_count", "Specs w/o ac_hash"), ] md = [ "# Spec history", "", f"- Snapshots archived: {len(snapshots)}", f"- Latest snapshot: {latest.get('timestamp', '?')}", "", "| Metric | Current | vs 7d | vs 30d |", "|---|---:|---:|---:|", ] for r in trend: md.append(f"| {r['label']} | {r['current']} | {r['vs_7d']} | {r['vs_30d']} |") md.append("") md.append(f"_Lower is better for untested / findings / drifted / stranded / unknown. Increases are red flags._") return { "snapshots_total": len(snapshots), "snapshots": recent, "trend": trend, "markdown": "\n".join(md), } - src/mk_spec_master/server.py:414-432 (schema)Tool registration with name, description, and inputSchema (accepts optional 'limit' integer, default 10).
Tool( name="get_spec_history", description=( "Return the last N snapshots archived by get_optimization_plan " "plus trend deltas (current vs ~7 days ago, vs ~30 days ago) " "for spec count, untested, quality findings, drift, stranded, " "and unknown-hash specs. Use when a user asks 'are we " "improving' / 'show me the trend' / 'how did we do this " "month'. Requires at least 2 snapshots for trend; degrades " "gracefully with fewer. " "Returns {snapshots_total, snapshots[], trend[], markdown}." ), inputSchema={ "type": "object", "properties": { "limit": {"type": "integer", "default": 10}, }, }, ), - src/mk_spec_master/server.py:60-60 (registration)Dispatch table entry mapping 'get_spec_history' to `history_tools.get_spec_history_tool`.
"get_spec_history": history_tools.get_spec_history_tool, - `archive_snapshot` helper writes a snapshot JSON to HISTORY_DIR; called by get_optimization_plan_tool to persist data that get_spec_history_tool later reads.
def archive_snapshot(snapshot: dict) -> str: """Write one snapshot JSON. Returns the path. Called from get_optimization_plan_tool. Failures are swallowed (we don't want history persistence to break the live coach output).""" try: config.HISTORY_DIR.mkdir(parents=True, exist_ok=True) stamped = dict(snapshot) stamped["timestamp"] = _now_iso() path = config.HISTORY_DIR / f"{stamped['timestamp']}.json" path.write_text(json.dumps(stamped, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") return str(path) except (OSError, TypeError): return "" - `_read_snapshots` helper reads archived snapshots from HISTORY_DIR; returns them in chronological order (oldest first) with an optional limit.
def _read_snapshots(limit: int | None = None) -> list[dict]: """Return snapshots in chronological order (oldest first). Reads the directory listing; tolerates missing dir + bad files.""" if not config.HISTORY_DIR.exists(): return [] files = sorted(config.HISTORY_DIR.glob("*.json")) if limit: files = files[-limit:] out: list[dict] = [] for f in files: try: out.append(json.loads(f.read_text(encoding="utf-8"))) except (OSError, json.JSONDecodeError): continue return out