Skip to main content
Glama
export_report_bundle.py8.65 kB
#!/usr/bin/env python3 """Export a self-contained bundle of SQL executions from doc.jsonl.""" from __future__ import annotations import argparse import json import sys from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any, Dict, Iterable, List, Optional from igloo_mcp.path_utils import ( find_repo_root, resolve_artifact_root, resolve_history_path, ) @dataclass class SelectionCriteria: query_ids: Optional[List[str]] = None reason_contains: Optional[str] = None since: Optional[str] = None until: Optional[str] = None latest_per_sql: bool = False def to_dict(self) -> Dict[str, Any]: payload: Dict[str, Any] = {} if self.query_ids is not None: payload["query_ids"] = self.query_ids if self.reason_contains is not None: payload["reason_contains"] = self.reason_contains if self.since is not None: payload["since"] = self.since if self.until is not None: payload["until"] = self.until payload["latest_per_sql"] = self.latest_per_sql return payload def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--doc", help="Path to doc.jsonl history (default: repo_local logs/doc.jsonl)", default=None, ) parser.add_argument( "--artifact-root", help="Root directory for artifacts (default: repo_local logs/artifacts)", default=None, ) group = parser.add_mutually_exclusive_group(required=True) group.add_argument( "--query-id", action="append", dest="query_ids", help="Specific query_id to include (repeatable)", ) group.add_argument( "--reason-contains", help="Substring match (case-insensitive) on reason field", ) parser.add_argument( "--since", help="ISO 8601 timestamp lower bound (inclusive) for ts field", ) parser.add_argument( "--until", help="ISO 8601 timestamp upper bound (inclusive) for ts field", ) parser.add_argument( "--latest-per-sql", action="store_true", help="Keep only the most recent entry per sql_sha256", ) parser.add_argument( "--output", required=True, help="Bundle output path (JSON)", ) return parser.parse_args() def _resolve_doc_path(raw: Optional[str]) -> Path: if raw is None: return resolve_history_path() return resolve_history_path(raw=raw) def _resolve_artifact_root(raw: Optional[str]) -> Path: if raw is None: return resolve_artifact_root() return resolve_artifact_root(raw=raw) def _load_jsonl(path: Path) -> Iterable[Dict[str, Any]]: with path.open("r", encoding="utf-8") as fh: for line in fh: line = line.strip() if not line: continue try: payload = json.loads(line) except json.JSONDecodeError: continue if isinstance(payload, dict): yield payload def _ts_from_iso8601(raw: Optional[str]) -> Optional[float]: if not raw: return None if raw.endswith("Z"): raw = raw[:-1] + "+00:00" try: return datetime.fromisoformat(raw).timestamp() except ValueError: raise ValueError(f"Invalid ISO 8601 timestamp: {raw}") def _select_entries( records: Iterable[Dict[str, Any]], criteria: SelectionCriteria, ) -> List[Dict[str, Any]]: selected: List[Dict[str, Any]] = [] since_ts = _ts_from_iso8601(criteria.since) until_ts = _ts_from_iso8601(criteria.until) for record in records: query_id = record.get("query_id") reason = record.get("reason", "") or "" ts = record.get("ts") if criteria.query_ids is not None: if query_id not in criteria.query_ids: continue else: if criteria.reason_contains is not None: if criteria.reason_contains.lower() not in reason.lower(): continue else: continue if since_ts is not None and ts is not None and ts < since_ts: continue if until_ts is not None and ts is not None and ts > until_ts: continue selected.append(record) if criteria.latest_per_sql: latest: Dict[str, Dict[str, Any]] = {} for record in selected: sha = record.get("sql_sha256") if not sha: continue current = latest.get(sha) if current is None or (record.get("ts") or 0) >= (current.get("ts") or 0): latest[sha] = record return list(latest.values()) return selected def _read_sql_text( artifact_root: Path, sha: str, artifacts: Dict[str, Any] ) -> Optional[str]: if len(sha) != 64 or any(c not in "0123456789abcdef" for c in sha): raise ValueError(f"Invalid sql_sha256 '{sha}'") if artifacts: sql_path = artifacts.get("sql_path") if sql_path: candidate = Path(sql_path) if not candidate.is_absolute(): repo_root = find_repo_root() candidate = (repo_root / sql_path).resolve() if candidate.exists() and candidate.is_file(): return candidate.read_text(encoding="utf-8") candidate = (artifact_root / "queries" / "by_sha" / f"{sha}.sql").resolve() if candidate.exists() and candidate.is_file(): return candidate.read_text(encoding="utf-8") return None def main() -> int: args = parse_args() try: doc_path = _resolve_doc_path(args.doc) artifact_root = _resolve_artifact_root(args.artifact_root) except Exception as exc: print(f"Failed to resolve paths: {exc}", file=sys.stderr) return 1 if not doc_path.exists(): print(f"History file not found: {doc_path}", file=sys.stderr) return 1 criteria = SelectionCriteria( query_ids=args.query_ids, reason_contains=args.reason_contains, since=args.since, until=args.until, ) criteria.latest_per_sql = args.latest_per_sql records = list(_load_jsonl(doc_path)) selected = _select_entries(records, criteria) if not selected: print("No matching entries found", file=sys.stderr) return 1 failures: Dict[str, str] = {} entries: List[Dict[str, Any]] = [] for record in selected: sha = record.get("sql_sha256") if not sha: continue artifacts = record.get("artifacts") or {} sql_text = _read_sql_text(artifact_root, sha, artifacts) if sql_text is None: failures[sha] = "SQL artifact not found" continue entry: Dict[str, Any] = { "sql_sha256": sha, "mcp_uri": f"igloo://queries/by-sha/{sha}.sql", "sql_text": sql_text, "status": record.get("status"), "ts": record.get("ts"), "timeout_seconds": record.get("timeout_seconds"), } if record.get("query_id"): entry["query_id"] = record["query_id"] if record.get("reason"): entry["reason"] = record["reason"] if record.get("rowcount") is not None: entry["rowcount"] = record.get("rowcount") if record.get("duration_ms") is not None: entry["duration_ms"] = record.get("duration_ms") if record.get("overrides"): entry["overrides"] = record.get("overrides") entries.append(entry) if failures: for sha, msg in failures.items(): print(f"Missing SQL for {sha}: {msg}", file=sys.stderr) return 1 output_path = Path(args.output).expanduser() if not output_path.is_absolute(): output_path = (find_repo_root() / output_path).resolve() output_path.parent.mkdir(parents=True, exist_ok=True) bundle = { "bundle_metadata": { "generated_at": datetime.utcnow().isoformat() + "Z", "doc_path": str(doc_path), "artifact_root": str(artifact_root), "selection": criteria.to_dict(), }, "entries": entries, } with output_path.open("w", encoding="utf-8") as fh: json.dump(bundle, fh, indent=2) fh.write("\n") print(f"Wrote bundle with {len(entries)} entries to {output_path}") return 0 if __name__ == "__main__": # pragma: no cover - CLI entrypoint sys.exit(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server