Skip to main content
Glama
service.py51.5 kB
"""High-level service layer for living reports operations. This module provides the main orchestration logic for living reports, coordinating between storage, index, and external integrations. """ from __future__ import annotations import contextlib import datetime import hashlib import json import logging import uuid import webbrowser from pathlib import Path from typing import Any from igloo_mcp.path_utils import resolve_history_path from . import models as living_reports_models from .history_index import HistoryIndex, ResolvedDataset from .index import ReportIndex from .models import AuditEvent, IndexEntry, Insight, Outline, ReportId, Section from .quarto_renderer import QuartoNotFoundError, QuartoRenderer, RenderResult from .storage import GlobalStorage, ReportStorage logger = logging.getLogger(__name__) class ReportService: """High-level service for living reports operations.""" def __init__(self, reports_root: Path | None = None) -> None: """Initialize report service. Args: reports_root: Root directory for reports (defaults to global/repo based on IGLOO_MCP_LOG_SCOPE) """ if reports_root is None: from igloo_mcp.path_utils import resolve_reports_root reports_root = resolve_reports_root() self.reports_root = reports_root self.global_storage = GlobalStorage(reports_root) self.index = ReportIndex(reports_root / "index.jsonl") # Initialize HistoryIndex lazily self._history_index: HistoryIndex | None = None @property def history_index(self) -> HistoryIndex: """Get the history index for resolving query references.""" if self._history_index is None: history_path = resolve_history_path() self._history_index = HistoryIndex(history_path) return self._history_index def _prepare_outline_for_render(self, outline: Outline) -> Outline: """Return a normalized copy of the outline with prose fields de-duplicated.""" normalized = outline.model_copy(deep=True) enumerated_sections = list(enumerate(normalized.sections)) enumerated_sections.sort( key=lambda item: ( item[1].order is None, item[1].order if item[1].order is not None else item[0], item[0], ) ) normalized.sections = [section for _, section in enumerated_sections] for idx, section in enumerate(normalized.sections): content = (section.content or "").strip() notes = (section.notes or "").strip() section.content = content or None if section.content: section.notes = None else: section.notes = notes or None if not section.content_format: section.content_format = "markdown" if section.order is None: section.order = idx return normalized def create_report( self, title: str, template: str = "default", actor: str = "cli", initial_sections: list[dict[str, Any]] | None = None, **metadata: Any, ) -> str: """Create a new report. Args: title: Human-readable title for the report template: Template name (default, monthly_sales, quarterly_review, deep_dive) actor: Who is creating the report (cli, agent, human) - defaults to "cli" for backward compatibility initial_sections: Optional inline sections (with optional inline insights) to seed the outline **metadata: Additional metadata (tags, owner, etc.) Returns: Report ID of the created report Raises: ValueError: If template name is invalid """ # Import templates.py module directly (avoiding conflict with templates/ directory) import importlib.util templates_file = Path(__file__).parent / "templates.py" spec = importlib.util.spec_from_file_location( "igloo_mcp.living_reports.templates", templates_file, submodule_search_locations=[str(Path(__file__).parent)], ) if spec is None or spec.loader is None: raise ValueError(f"Could not load templates module from {templates_file}") templates_mod = importlib.util.module_from_spec(spec) templates_mod.__package__ = "igloo_mcp.living_reports" templates_mod.__name__ = "igloo_mcp.living_reports.templates" # Set up models import for templates templates_mod.models = living_reports_models # type: ignore[attr-defined] spec.loader.exec_module(templates_mod) get_template = templates_mod.get_template # Normalize actor to a valid value if actor not in {"cli", "agent", "human"}: actor = "cli" report_id = ReportId.new() now = datetime.datetime.now(datetime.UTC).isoformat() insights: list[Any] = [] if initial_sections: sections: list[Any] = [] def _ensure_supporting(payload: dict[str, Any]) -> None: if "supporting_queries" not in payload or payload["supporting_queries"] is None: payload["supporting_queries"] = [] if "citations" in payload and payload["citations"] is not None and not payload["supporting_queries"]: payload["supporting_queries"] = payload["citations"] if payload.get("supporting_queries") and not payload.get("citations"): payload["citations"] = payload["supporting_queries"] for idx, section_data in enumerate(initial_sections): data = dict(section_data) if "section_id" not in data or data["section_id"] is None: data["section_id"] = str(uuid.uuid4()) if "title" not in data or data["title"] is None: data["title"] = f"Section {idx + 1}" if "order" not in data or data["order"] is None: data["order"] = idx insight_ids: list[str] = [] if "insights" in data and data["insights"] is not None: raw_insights = data["insights"] if not isinstance(raw_insights, list): raise ValueError("initial_sections.insights must be a list of insight dicts") for _insight_idx, raw_insight in enumerate(raw_insights): if not isinstance(raw_insight, dict): raise ValueError("Each insight must be a dict") payload = dict(raw_insight) if "insight_id" not in payload or payload["insight_id"] is None: payload["insight_id"] = str(uuid.uuid4()) if payload.get("summary") is None or payload.get("importance") is None: raise ValueError("Insights must include summary and importance") if "status" not in payload or payload["status"] is None: payload["status"] = "active" _ensure_supporting(payload) insights.append(Insight(**payload)) insight_ids.append(payload["insight_id"]) elif data.get("insight_ids"): if not isinstance(data["insight_ids"], list): raise ValueError("initial_sections.insight_ids must be a list") insight_ids = data["insight_ids"] section = Section( section_id=data["section_id"], title=data["title"], order=data.get("order", idx), notes=data.get("notes"), content=data.get("content"), content_format=data.get("content_format", "markdown"), insight_ids=insight_ids, ) sections.append(section) else: # Get template sections try: sections = get_template(template) except ValueError as e: raise ValueError(f"Invalid template: {e}") from e # Note: Empty sections from template (like "default") is intentional # and should remain empty for maximum flexibility # Create initial outline with either template or provided sections outline = Outline( report_id=str(report_id), title=title, created_at=now, updated_at=now, version="1.0", outline_version=1, sections=sections, insights=insights, metadata={**metadata, "template": template}, ) # Create storage and save storage = self.global_storage.get_report_storage(str(report_id)) with storage.lock(): storage._save_outline_atomic(outline) # No backup for initial creation # Create audit event audit_event = AuditEvent( action_id=str(uuid.uuid4()), report_id=str(report_id), ts=now, actor=actor, action_type="create", payload={"initial_title": title}, ) storage.append_audit_event(audit_event) # Add to index index_entry = IndexEntry( report_id=str(report_id), current_title=title, created_at=now, updated_at=now, tags=metadata.get("tags", []), status="active", path=f"by_id/{report_id}", ) self.index.add_entry(index_entry) return str(report_id) def get_report_outline(self, report_id: str) -> Outline: """Get the current outline for a report. Args: report_id: Report identifier Returns: Current outline Raises: ValueError: If report not found """ storage = self.global_storage.get_report_storage(report_id) try: return storage.load_outline() except FileNotFoundError as e: raise ValueError(f"Report not found: {report_id}") from e def update_report_status( self, report_id: str, status: str, actor: str = "agent", request_id: str | None = None, ) -> None: """Update report status in index and audit log.""" now = datetime.datetime.now(datetime.UTC).isoformat() entry = self.index.get_entry(report_id) if entry: entry.status = status entry.updated_at = now self.index.add_entry(entry) else: raise ValueError(f"Report not found: {report_id}") # Record audit event storage = self.global_storage.get_report_storage(report_id) with storage.lock(): audit_event = AuditEvent( action_id=str(uuid.uuid4()), report_id=report_id, ts=now, actor=actor, action_type="status_change", request_id=request_id, payload={"status": status}, ) storage.append_audit_event(audit_event) def update_report_outline( self, report_id: str, outline: Outline, actor: str = "cli", expected_version: int | None = None, request_id: str | None = None, ) -> None: """Update a report's outline with optional version check. Args: report_id: Report identifier outline: New outline actor: Who is making the change (cli, agent, human) expected_version: Expected outline_version for optimistic locking request_id: Optional request correlation ID for tracing Raises: ValueError: If version mismatch (concurrent modification detected) """ storage = self.global_storage.get_report_storage(report_id) now = datetime.datetime.now(datetime.UTC).isoformat() # Normalize actor to a valid value for AuditEvent if actor not in {"cli", "agent", "human"}: actor = "agent" with storage.lock(): # Load current outline for comparison old_outline = storage.load_outline() # Version check for optimistic locking if expected_version is not None and old_outline.outline_version != expected_version: raise ValueError( f"Version mismatch: expected {expected_version}, " f"got {old_outline.outline_version}. " f"Report was modified concurrently." ) # Increment version outline.outline_version = old_outline.outline_version + 1 outline.updated_at = now # Save new outline (without additional audit events) backup_filename = storage._save_outline_atomic(outline) # Calculate change summary for audit payload sections_added = len(outline.sections) - len(old_outline.sections) insights_added = len(outline.insights) - len(old_outline.insights) # Track IDs of added/modified items old_section_ids = {s.section_id for s in old_outline.sections} new_section_ids = {s.section_id for s in outline.sections} section_ids_added = list(new_section_ids - old_section_ids) section_ids_modified = list(new_section_ids & old_section_ids) section_ids_removed = list(old_section_ids - new_section_ids) old_insight_ids = {i.insight_id for i in old_outline.insights} new_insight_ids = {i.insight_id for i in outline.insights} insight_ids_added = list(new_insight_ids - old_insight_ids) insight_ids_modified = list(new_insight_ids & old_insight_ids) insight_ids_removed = list(old_insight_ids - new_insight_ids) # Create audit event audit_event = AuditEvent( action_id=str(uuid.uuid4()), report_id=report_id, ts=now, actor=actor, action_type="evolve", request_id=request_id, payload={ "old_title": old_outline.title, "new_title": outline.title, "sections_changed": len(outline.sections) != len(old_outline.sections), "insights_changed": len(outline.insights) != len(old_outline.insights), "sections_added": sections_added, "insights_added": insights_added, "section_ids_added": section_ids_added, "section_ids_modified": section_ids_modified, "section_ids_removed": section_ids_removed, "insight_ids_added": insight_ids_added, "insight_ids_modified": insight_ids_modified, "insight_ids_removed": insight_ids_removed, "backup_filename": backup_filename, "outline_version": outline.outline_version, }, ) storage.append_audit_event(audit_event) # Update index index_entry = IndexEntry( report_id=report_id, current_title=outline.title, created_at=outline.created_at, updated_at=now, tags=outline.metadata.get("tags", []), status=outline.metadata.get("status", "active"), path=f"by_id/{report_id}", ) self.index.add_entry(index_entry) def resolve_report_selector(self, selector: str) -> str: """Resolve a report selector (ID or title) to a report ID. Args: selector: Report ID or title Returns: Resolved report ID Raises: ValueError: If selector cannot be resolved """ # First try as direct ID try: ReportId(selector) if self.index.get_entry(selector): return selector except ValueError: pass # Try as title resolved = self.index.resolve_title(selector) if resolved: return resolved raise ValueError(f"Report not found: {selector}") def list_reports( self, status: str | None = None, tags: list[str] | None = None, ) -> list[dict[str, Any]]: """List reports with optional filtering. Args: status: Filter by status tags: Filter by tags Returns: List of report summaries """ entries = self.index.list_entries(status=status, tags=tags) reports = [] for entry in entries: reports.append( { "id": entry.report_id, "title": entry.current_title, "created_at": entry.created_at, "updated_at": entry.updated_at, "tags": entry.tags, "status": entry.status, } ) return reports def revert_report(self, report_id: str, action_id: str, actor: str = "cli") -> dict[str, Any]: """Revert a report to a previous state. Args: report_id: Report identifier action_id: Action ID to revert to actor: Who is performing the revert Returns: Dictionary with revert details Raises: ValueError: If action_id not found or backup missing """ storage = self.global_storage.get_report_storage(report_id) with storage.lock(): # Load audit events events = storage.load_audit_events() # Find the target action target_event = None for event in events: if event.action_id == action_id: target_event = event break if not target_event: raise ValueError( f"Action not found: {action_id}. Use 'igloo report history {report_id}' to see available actions." ) # Get backup filename from event payload backup_filename = target_event.payload.get("backup_filename") if not backup_filename: raise ValueError( f"No backup associated with action: {action_id}. " f"Only 'evolve' and 'update' actions can be reverted." ) backup_path = storage.backups_dir / backup_filename if not backup_path.exists(): raise ValueError( f"Backup file missing: {backup_filename}. " f"It may have been deleted or filesystem corruption occurred." ) # Load backup to verify it's valid try: backup_data = json.loads(backup_path.read_text(encoding="utf-8")) # Validate backup file structure Outline(**backup_data) except Exception as e: raise ValueError(f"Backup file is corrupted: {e}") from e # Create backup of current state before reverting current_backup_filename = storage._create_backup() # Restore backup atomically import shutil shutil.copy2(backup_path, storage.outline_path) # Log revert audit event now = datetime.datetime.now(datetime.UTC).isoformat() revert_audit_event = AuditEvent( action_id=str(uuid.uuid4()), report_id=report_id, ts=now, actor=actor, action_type="revert", payload={ "reverted_from_action_id": action_id, "reverted_from_action_type": target_event.action_type, "reverted_from_timestamp": target_event.ts, "backup_restored": backup_filename, "current_state_backup": current_backup_filename, }, ) storage.append_audit_event(revert_audit_event) # Update index outline = storage.load_outline() index_entry = IndexEntry( report_id=report_id, current_title=outline.title, created_at=outline.created_at, updated_at=now, tags=outline.metadata.get("tags", []), status=outline.metadata.get("status", "active"), path=f"by_id/{report_id}", ) self.index.add_entry(index_entry) return { "success": True, "reverted_to_action_id": action_id, "reverted_to_timestamp": target_event.ts, "backup_filename": backup_filename, } def validate_report(self, report_id: str) -> list[str]: """Validate a report's consistency. Args: report_id: Report identifier Returns: List of validation errors """ errors = [] try: outline = self.get_report_outline(report_id) except ValueError as e: errors.append(str(e)) return errors # Validate section references insight_ids = {insight.insight_id for insight in outline.insights} for section in outline.sections: for insight_id in section.insight_ids: if insight_id not in insight_ids: errors.append(f"Section '{section.title}' references unknown insight: {insight_id}") # Validate insight references section_insight_ids = set() for section in outline.sections: section_insight_ids.update(section.insight_ids) for insight in outline.insights: if insight.insight_id not in section_insight_ids: errors.append(f"Insight '{insight.summary}' not referenced by any section") return errors def resolve_insight_datasets(self, report_id: str) -> dict[str, ResolvedDataset]: """Resolve all insight datasets for a report. Args: report_id: Report identifier Returns: Dictionary mapping insight IDs to resolved datasets """ outline = self.get_report_outline(report_id) resolved_datasets = {} for section in outline.sections: for insight_id in section.insight_ids: insight = outline.get_insight(insight_id) for i, dataset_source in enumerate(insight.supporting_queries): try: # Resolve dataset directly with name and source resolved = self.history_index.resolve_dataset( dataset_name=f"{insight_id}_{i}", source=dataset_source, ) resolved_datasets[insight_id] = resolved except Exception as e: # Skip unresolvable datasets for now logger.debug(f"Could not resolve dataset for insight {insight_id}: {e}") continue return resolved_datasets def archive_report(self, report_id: str, actor: str = "cli") -> None: """Archive a report (set status to archived). Args: report_id: Report identifier actor: Who is archiving the report """ storage = self.global_storage.get_report_storage(report_id) now = datetime.datetime.now(datetime.UTC).isoformat() with storage.lock(): outline = storage.load_outline() outline.metadata["status"] = "archived" outline.updated_at = now backup_filename = storage._save_outline_atomic(outline) # Log audit event audit_event = AuditEvent( action_id=str(uuid.uuid4()), report_id=report_id, ts=now, actor=actor, action_type="archive", payload={"backup_filename": backup_filename}, ) storage.append_audit_event(audit_event) # Update index index_entry = IndexEntry( report_id=report_id, current_title=outline.title, created_at=outline.created_at, updated_at=now, tags=outline.metadata.get("tags", []), status="archived", path=f"by_id/{report_id}", ) self.index.add_entry(index_entry) def delete_report(self, report_id: str, actor: str = "cli") -> str: """Soft delete - move report to .trash directory. Args: report_id: Report identifier actor: Who is deleting the report Returns: Path to trash location Raises: ValueError: If report not found """ storage = self.global_storage.get_report_storage(report_id) if not storage.report_dir.exists(): raise ValueError(f"Report not found: {report_id}") # Create trash directory trash_dir = self.reports_root / ".trash" trash_dir.mkdir(exist_ok=True) # Generate trash location with timestamp now = datetime.datetime.now(datetime.UTC).isoformat() timestamp_clean = now.replace(":", "").replace("-", "").split(".")[0] trash_location = trash_dir / f"{report_id}_{timestamp_clean}" # Move report directory to trash import shutil shutil.move(str(storage.report_dir), str(trash_location)) # Remove from index self.index.remove_entry(report_id) # Log delete event in trash location trash_storage = ReportStorage(trash_location) delete_audit_event = AuditEvent( action_id=str(uuid.uuid4()), report_id=report_id, ts=now, actor=actor, action_type="delete", payload={ "trash_location": str(trash_location), "deleted_at": now, }, ) trash_storage.append_audit_event(delete_audit_event) return str(trash_location) def tag_report( self, report_id: str, tags_to_add: list[str] | None = None, tags_to_remove: list[str] | None = None, actor: str = "cli", ) -> None: """Add or remove tags from report. Args: report_id: Report identifier tags_to_add: Tags to add (None or empty list to add nothing) tags_to_remove: Tags to remove (None or empty list to remove nothing) actor: Who is modifying tags """ storage = self.global_storage.get_report_storage(report_id) now = datetime.datetime.now(datetime.UTC).isoformat() with storage.lock(): outline = storage.load_outline() current_tags = set(outline.metadata.get("tags", [])) if tags_to_add: current_tags.update(tags_to_add) if tags_to_remove: current_tags.difference_update(tags_to_remove) outline.metadata["tags"] = sorted(current_tags) outline.updated_at = now backup_filename = storage._save_outline_atomic(outline) # Log audit event audit_event = AuditEvent( action_id=str(uuid.uuid4()), report_id=report_id, ts=now, actor=actor, action_type="tag_update", payload={ "tags_added": list(tags_to_add) if tags_to_add else [], "tags_removed": list(tags_to_remove) if tags_to_remove else [], "final_tags": outline.metadata["tags"], "backup_filename": backup_filename, }, ) storage.append_audit_event(audit_event) # Update index index_entry = IndexEntry( report_id=report_id, current_title=outline.title, created_at=outline.created_at, updated_at=now, tags=outline.metadata["tags"], status=outline.metadata.get("status", "active"), path=f"by_id/{report_id}", ) self.index.add_entry(index_entry) def fork_report(self, source_id: str, new_title: str, actor: str = "cli") -> str: """Fork an existing report to new ID. Creates a complete copy of the source report with a new ID, preserving all sections, insights, and metadata (except fork tracking). Args: source_id: Source report ID new_title: Title for forked report actor: Who is forking the report Returns: New report ID Raises: ValueError: If source report not found """ # Verify source exists (check for outline.json, not just directory) source_storage = self.global_storage.get_report_storage(source_id) if not source_storage.outline_path.exists(): raise ValueError(f"Source report not found: {source_id}") # Generate new ID new_id = ReportId.new() new_report_dir = self.reports_root / "by_id" / str(new_id) # Copy entire directory structure (use dirs_exist_ok in case parent exists) import shutil shutil.copytree(source_storage.report_dir, new_report_dir, dirs_exist_ok=True) # Now get storage for the new report new_storage = self.global_storage.get_report_storage(str(new_id)) # Load and update outline with new_storage.lock(): outline = new_storage.load_outline() now = datetime.datetime.now(datetime.UTC).isoformat() # Update IDs and metadata outline.report_id = str(new_id) outline.title = new_title outline.created_at = now outline.updated_at = now outline.outline_version = 1 # Reset version outline.metadata["forked_from"] = source_id outline.metadata["forked_at"] = now # Save updated outline new_storage._save_outline_atomic(outline) # Log fork audit event fork_audit_event = AuditEvent( action_id=str(uuid.uuid4()), report_id=str(new_id), ts=now, actor=actor, action_type="fork", payload={ "source_report_id": source_id, "new_title": new_title, }, ) new_storage.append_audit_event(fork_audit_event) # Register in index index_entry = IndexEntry( report_id=str(new_id), current_title=new_title, created_at=now, updated_at=now, tags=outline.metadata.get("tags", []), status="active", path=f"by_id/{new_id}", ) self.index.add_entry(index_entry) return str(new_id) def synthesize_reports(self, source_ids: list[str], title: str, actor: str = "cli") -> str: """Create new report combining insights from multiple sources. Copies all sections and insights from source reports into a new report. Agents can then refine and reorganize the synthesized content. Args: source_ids: List of source report IDs title: Title for synthesized report actor: Who is creating the synthesis Returns: New report ID Raises: ValueError: If any source report not found """ if not source_ids: raise ValueError("At least one source report is required") # Verify all sources exist (check for outline.json, not just directory) for source_id in source_ids: storage = self.global_storage.get_report_storage(source_id) if not storage.outline_path.exists(): raise ValueError(f"Source report not found: {source_id}") # Create new report (empty) new_id = self.create_report(title, tags=["synthesized"], actor=actor) # Load all source outlines all_sections: list[Section] = [] all_insights: list[Insight] = [] all_tags = set() for source_id in source_ids: source_outline = self.get_report_outline(source_id) # Copy sections (with updated order to avoid conflicts) for section in source_outline.sections: # Create a copy with the new order (use current list length as order) section_copy = section.model_copy(deep=True) section_copy.order = len(all_sections) all_sections.append(section_copy) # Copy insights (create copies to avoid mutation issues) all_insights.extend(insight.model_copy(deep=True) for insight in source_outline.insights) # Merge tags all_tags.update(source_outline.metadata.get("tags", [])) # Update new report with combined content new_outline = self.get_report_outline(new_id) new_outline.sections = all_sections new_outline.insights = all_insights # Preserve "synthesized" tag and add source tags all_tags.add("synthesized") new_outline.metadata["tags"] = sorted(all_tags) new_outline.metadata["synthesized_from"] = source_ids new_outline.metadata["synthesis_note"] = ( f"Synthesized from {len(source_ids)} source reports. " "Sections and insights copied; agent can refine organization." ) self.update_report_outline(new_id, new_outline, actor=actor) return new_id def _build_citation_map(self, outline: Outline) -> dict[str, int]: """Build stable mapping of execution_id to citation number. Scans all insights in the outline and creates a mapping from execution_id to citation number (1-indexed, ordered by first appearance). Args: outline: Report outline to scan Returns: Dictionary mapping execution_id to citation number """ citation_map: dict[str, int] = {} citation_number = 1 # Iterate through sections in order, then insights within each section for section in sorted(outline.sections, key=lambda s: s.order): for insight_id in section.insight_ids: try: insight = outline.get_insight(insight_id) references = insight.citations or insight.supporting_queries # Only use first citation/supporting query for citation numbering if references and len(references) > 0: exec_id = references[0].execution_id if exec_id and exec_id not in citation_map: citation_map[exec_id] = citation_number citation_number += 1 except ValueError: # Skip if insight not found continue return citation_map def _get_next_order(self, outline: Outline) -> int: """Get next available order value for a new section. Args: outline: Report outline to check existing orders Returns: Next available order value (max existing + 1, or 0 if no sections) """ existing = [s.order for s in outline.sections if s.order is not None] return max(existing, default=-1) + 1 def _check_duplicate_orders(self, outline: Outline) -> list[str]: """Check for duplicate order values and return warnings. Args: outline: Report outline to check Returns: List of warning messages (empty if no duplicates) """ orders = [s.order for s in outline.sections if s.order is not None] duplicates = [o for o in set(orders) if orders.count(o) > 1] if duplicates: return [ f"Sections have duplicate order values: {sorted(set(duplicates))}. " "Rendering order may be unpredictable. Use reorder_sections to fix." ] return [] def reorder_sections( self, report_id: str, section_order: list[str], actor: str = "cli", ) -> dict[str, Any]: """Reorder sections by section_id list. Assigns order values 0, 1, 2, ... based on position in the provided list. Sections not in the list retain their current order but are shifted to come after the reordered sections. Args: report_id: Report identifier section_order: List of section_ids in desired order actor: Who is performing the reorder Returns: Dictionary with: - reordered: Number of sections reordered - new_order: Final section order (all section_ids) - warnings: List of warning messages """ outline = self.get_report_outline(report_id) section_map = {s.section_id: s for s in outline.sections} warnings: list[str] = [] # Track which sections were explicitly ordered ordered_ids = set() for idx, sid in enumerate(section_order): if sid in section_map: section_map[sid].order = idx ordered_ids.add(sid) else: warnings.append(f"Section ID not found: {sid}") # Shift unordered sections to come after ordered ones next_order = len(section_order) for section in outline.sections: if section.section_id not in ordered_ids: section.order = next_order next_order += 1 # Update the outline self.update_report_outline(report_id, outline, actor=actor) # Return final order final_order = [s.section_id for s in sorted(outline.sections, key=lambda s: s.order)] return { "reordered": len(ordered_ids), "new_order": final_order, "warnings": warnings, } def render_report( self, report_id: str, format: str = "html", options: dict[str, Any] | None = None, open_browser: bool = False, include_preview: bool = False, preview_max_chars: int = 2000, dry_run: bool = False, ) -> dict[str, Any]: """Render a living report to the specified format using Quarto. Args: report_id: Report identifier format: Output format ('html', 'pdf', 'markdown', etc.) options: Additional Quarto rendering options (toc, theme, etc.) open_browser: Whether to open the rendered output in browser (HTML only) include_preview: Whether to include a truncated preview in response dry_run: If True, only generate QMD file without running Quarto Returns: Dictionary with render result containing: - status: 'success', 'quarto_missing', 'validation_failed', or 'render_failed' - report_id: Resolved report ID - output: Dict with format, output_path, assets_dir (if applicable) - preview: Truncated content string (if include_preview=True) - warnings: List of warning messages - audit_action_id: ID of the audit event logged Raises: ValueError: If report not found or invalid """ options = options or {} try: # Resolve report ID resolved_id = self.resolve_report_selector(report_id) except ValueError as e: return { "status": "validation_failed", "report_id": report_id, "validation_errors": [str(e)], } # Validate report validation_errors = self.validate_report(resolved_id) if validation_errors: return { "status": "validation_failed", "report_id": resolved_id, "validation_errors": validation_errors, } # Detect Quarto (unless dry run) renderer = None if not dry_run: try: renderer = QuartoRenderer.detect() except QuartoNotFoundError as e: return { "status": "quarto_missing", "report_id": resolved_id, "error": str(e), } # Load report data try: outline = self.get_report_outline(resolved_id) outline = self._prepare_outline_for_render(outline) storage = self.global_storage.get_report_storage(resolved_id) report_dir = storage.report_dir qmd_path = report_dir / "report.qmd" # Load dataset sources if they exist datasets = {} dataset_sources_path = report_dir / "dataset_sources.json" if dataset_sources_path.exists(): with open(dataset_sources_path, encoding="utf-8") as f: datasets = json.load(f) # Resolve query history for traceability using batch lookup query_provenance: dict[str, dict[str, Any]] = {} try: # Collect all execution_ids first for batch lookup execution_ids_to_resolve: list[str] = [] exec_id_to_sql_sha: dict[str, str | None] = {} for insight in outline.insights: references = insight.citations or insight.supporting_queries for query in references: if query.execution_id: execution_ids_to_resolve.append(query.execution_id) exec_id_to_sql_sha[query.execution_id] = getattr(query, "sql_sha256", None) # Batch lookup - single dict comprehension instead of N individual lookups history_records = self.history_index.get_records_batch(execution_ids_to_resolve) # Build provenance from batch results for exec_id, history_record in history_records.items(): query_provenance[exec_id] = { "execution_id": exec_id, "timestamp": history_record.get("timestamp") or history_record.get("ts"), "duration_ms": history_record.get("duration_ms"), "rowcount": history_record.get("rowcount"), "status": history_record.get("status"), "statement_preview": history_record.get("statement_preview"), "sql_sha256": history_record.get("sql_sha256") or exec_id_to_sql_sha.get(exec_id), } except Exception: # Don't fail rendering if provenance resolution fails pass # Build citation map for analyst reports citation_map = {} citation_details = {} if outline.metadata.get("template") == "analyst_v1": citation_map = self._build_citation_map(outline) # Build citation_details dict with provenance info for appendix for exec_id, _citation_num in citation_map.items(): if exec_id in query_provenance: citation_details[exec_id] = query_provenance[exec_id] else: # Fallback if provenance not found citation_details[exec_id] = { "execution_id": exec_id, "timestamp": None, } except Exception as e: return { "status": "render_failed", "report_id": resolved_id, "error": f"Failed to load report data: {e}", } # Render the report or generate QMD only (dry run) if dry_run: # Dry run: generate QMD file only try: # Prepare render hints with query provenance and citations render_hints = outline.metadata.get("render_hints", {}) if not isinstance(render_hints, dict): render_hints = {} render_hints["query_provenance"] = query_provenance render_hints["citation_map"] = citation_map render_hints["citation_details"] = citation_details renderer = renderer or QuartoRenderer() # Create instance for QMD generation renderer._generate_qmd_file(report_dir, format, options or {}, outline, datasets, render_hints) result = RenderResult( output_paths=[str(qmd_path)], stdout="Dry run: QMD file generated successfully", stderr="", warnings=["Dry run mode - Quarto render was skipped"], ) except Exception as e: return { "status": "render_failed", "report_id": resolved_id, "error": f"QMD generation failed: {e}", } else: # Normal render try: # Ensure renderer exists (should be set by detect() above) assert renderer is not None, "Renderer should be set by detect() for normal render" # Prepare render hints with query provenance and citations render_hints = outline.metadata.get("render_hints", {}) if not isinstance(render_hints, dict): render_hints = {} render_hints["query_provenance"] = query_provenance render_hints["citation_map"] = citation_map render_hints["citation_details"] = citation_details result = renderer.render( report_dir=report_dir, format=format, options=options, outline=outline, datasets=datasets, hints=render_hints, ) if qmd_path.exists() and str(qmd_path) not in result.output_paths: result.output_paths.insert(0, str(qmd_path)) except Exception as e: return { "status": "render_failed", "report_id": resolved_id, "error": f"Rendering failed: {e}", } # Create audit event now = datetime.datetime.now(datetime.UTC).isoformat() outline_sha256 = hashlib.sha256(json.dumps(outline.model_dump(), sort_keys=True).encode()).hexdigest() audit_event = AuditEvent( action_id=str(uuid.uuid4()), report_id=resolved_id, ts=now, actor="agent", action_type="render", payload={ "format": format, "output_files": result.output_paths, "quarto_version": renderer.version, "outline_sha256": outline_sha256, "options": options, }, ) # Save audit event storage = self.global_storage.get_report_storage(resolved_id) with storage.lock(): storage.append_audit_event(audit_event) # Prepare response primary_output_path = result.output_paths[0] if result.output_paths else None rendered_output_path: str | None = None qmd_output_path = str(qmd_path.resolve()) if qmd_path.exists() else None if result.output_paths: rendered_output_path = next( ( str(Path(p).resolve()) for p in result.output_paths if not str(p).endswith(".qmd") and Path(p).exists() ), None, ) if rendered_output_path is None: candidate = primary_output_path if candidate and Path(candidate).exists(): rendered_output_path = str(Path(candidate).resolve()) else: ext = {"markdown": "md"}.get(format, format) fallback_path = (report_dir / f"report.{ext}").resolve() if fallback_path.exists(): rendered_output_path = str(fallback_path) response = { "status": "success", "report_id": resolved_id, "output": { "format": format, "output_path": rendered_output_path or qmd_output_path, "qmd_path": qmd_output_path, "assets_dir": ( str((report_dir / "_files").resolve()) if format == "html" and (report_dir / "_files").exists() else None ), }, "warnings": result.warnings, "audit_action_id": audit_event.action_id, } # Generate preview if requested if include_preview: preview_path = rendered_output_path or qmd_output_path if preview_path: try: preview = self._generate_preview(preview_path, max_chars=preview_max_chars) if preview: response["preview"] = preview response["output"]["preview"] = preview except Exception: # Don't fail the render if preview generation fails pass # Open in browser if requested and it's HTML if open_browser and format == "html" and result.output_paths: with contextlib.suppress(Exception): # Don't fail if browser opening fails webbrowser.open(f"file://{result.output_paths[0]}") return response def _generate_preview(self, output_path: str, max_chars: int = 2000) -> str | None: """Generate a truncated preview of the rendered output. Args: output_path: Path to the rendered output file max_chars: Maximum characters to include in preview Returns: Truncated preview string or None if generation fails """ try: path = Path(output_path) if not path.exists(): return None with open(path, encoding="utf-8") as f: content = f.read() # Truncate and add indicator if needed if len(content) > max_chars: content = content[:max_chars] + "...\n\n[Content truncated]" return content except Exception: return None __all__ = ["ReportService"]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server