Skip to main content
Glama
models.py19.3 kB
"""Pydantic models for living reports data structures. This module defines the core data models for the living reports system, providing validation and serialization for all report components. """ from __future__ import annotations import datetime as dt import uuid from typing import Any from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator def _now_iso() -> str: """Return current UTC time as ISO 8601 string.""" return dt.datetime.now(dt.UTC).isoformat() class DatasetSource(BaseModel): """Source binding for a dataset within a report. At least one of execution_id, sql_sha256, or cache_manifest must be provided so the resolver can bind this dataset to concrete history/cache artifacts. """ model_config = ConfigDict(extra="forbid") execution_id: str | None = Field( default=None, description="Execution ID from audit_info.execution_id or history JSONL", ) sql_sha256: str | None = Field( default=None, description="SHA-256 hash of the SQL text (statement_sha256)", ) cache_manifest: str | None = Field( default=None, description=( "Path to a cache manifest.json (absolute or repo-relative). When " "provided, this takes precedence over history lookups." ), ) cache_only: bool = Field( default=False, description="If true, do not attempt to re-run queries (reserved).", ) # Future hints for profile/context overrides (stored but unused for now). profile: str | None = Field(default=None) warehouse: str | None = Field(default=None) database: str | None = Field(default=None) db_schema: str | None = Field(default=None, alias="schema") role: str | None = Field(default=None) @field_validator("db_schema", mode="before") @classmethod def _validate_schema_alias(cls, v): """Allow 'schema' as an alias for 'db_schema'.""" return v def __or__(self, other): """Merge two DatasetSource instances, preferring self's values.""" if not isinstance(other, DatasetSource): return self merged = {} for field in self.model_fields: self_val = getattr(self, field) other_val = getattr(other, field) merged[field] = self_val if self_val is not None else other_val return DatasetSource(**merged) class Citation(BaseModel): """Flexible citation supporting multiple source types. Citations provide traceability for insights by linking to various data sources including queries, APIs, URLs, observations, and documents. """ model_config = ConfigDict(extra="forbid") # Required: source type source: str = Field( ..., description="Citation source type", pattern="^(query|api|url|observation|document)$", ) # Common optional fields provider: str | None = Field( default=None, description="Specific system (snowflake, allium, defillama, coingecko, etc.)", ) description: str | None = Field( default=None, description="Human-readable context for this citation", ) # Query-specific fields (source="query") execution_id: str | None = Field( default=None, description="Execution ID from query history (for Snowflake queries)", ) query_id: str | None = Field( default=None, description="Query ID for external query platforms (Allium, Dune, etc.)", ) sql_sha256: str | None = Field( default=None, description="SHA-256 hash of the SQL text", ) cache_manifest: str | None = Field( default=None, description="Path to cache manifest for query results", ) # URL-specific fields (source="url") url: str | None = Field( default=None, description="Web URL for articles, blogs, documentation", ) title: str | None = Field( default=None, description="Title of the web page or article", ) accessed_at: str | None = Field( default=None, description="ISO 8601 timestamp when URL was accessed", ) # API-specific fields (source="api") endpoint: str | None = Field( default=None, description="API endpoint path or URL", ) response_hash: str | None = Field( default=None, description="Hash of API response for verification", ) # Document-specific fields (source="document") path: str | None = Field( default=None, description="File path to document (PDF, whitepaper, etc.)", ) page: str | None = Field( default=None, description="Page number or section reference", ) # Observation-specific fields (source="observation") observed_at: str | None = Field( default=None, description="ISO 8601 timestamp when observation was made", ) class ResolvedDataset(BaseModel): """Concrete dataset resolved from history/cache artifacts.""" name: str rows: list[dict[str, Any]] columns: list[str] key_metrics: dict[str, Any] | None insights: list[Any] provenance: dict[str, Any] class ReportId: """Stable UUID-based identifier for reports. Provides string representation and validation for report identifiers. """ def __init__(self, value: str | uuid.UUID) -> None: """Initialize ReportId from string or UUID. Args: value: UUID string or UUID object Raises: ValueError: If value is not a valid UUID """ if isinstance(value, str): try: self._uuid = uuid.UUID(value) except ValueError as e: raise ValueError(f"Invalid UUID string: {value}") from e elif isinstance(value, uuid.UUID): self._uuid = value else: raise ValueError(f"ReportId must be string or UUID, got {type(value)}") @classmethod def new(cls) -> ReportId: """Create a new random ReportId.""" return cls(uuid.uuid4()) @property def uuid(self) -> uuid.UUID: """Get the underlying UUID object.""" return self._uuid def __str__(self) -> str: """Return canonical UUID string representation.""" return str(self._uuid) def __repr__(self) -> str: """Return detailed string representation.""" return f"ReportId('{self!s}')" def __eq__(self, other: object) -> bool: """Check equality with another ReportId.""" if not isinstance(other, ReportId): return NotImplemented return self._uuid == other._uuid def __hash__(self) -> int: """Hash based on UUID.""" return hash(self._uuid) class Insight(BaseModel): """A single insight within a report section. Insights represent key findings or observations that are backed by supporting query results and have configurable importance levels. """ model_config = ConfigDict(extra="forbid", validate_assignment=True) insight_id: str = Field( ..., description="Stable UUID for this insight", min_length=1, ) importance: int = Field( ..., description="Importance score from 0 (lowest) to 10 (highest)", ge=0, le=10, ) status: str = Field( "active", description="Insight status: active, archived, or killed", pattern="^(active|archived|killed)$", ) summary: str = Field( ..., description="Human-readable summary of the insight", min_length=1, ) supporting_queries: list[DatasetSource] = Field( default_factory=list, description="DEPRECATED: Use citations instead. List of query references.", ) citations: list[Citation] = Field( default_factory=list, description="List of citations (query, api, url, observation, document)", ) created_at: str = Field( default_factory=_now_iso, description="ISO 8601 timestamp when insight was created", ) updated_at: str = Field( default_factory=_now_iso, description="ISO 8601 timestamp when insight was last updated", ) draft_changes: dict[str, Any] | None = Field( default=None, description="Pending changes from LLM evolution", ) metadata: dict[str, Any] = Field( default_factory=dict, description="Additional insight metadata (chart_id, etc.)", ) @field_validator("insight_id") @classmethod def _validate_insight_id(cls, v: str) -> str: """Validate insight_id is a valid UUID string.""" try: uuid.UUID(v) except ValueError as e: raise ValueError(f"insight_id must be valid UUID string: {v}") from e return v @model_validator(mode="after") def _sync_citations(self) -> Insight: """Keep citations and supporting_queries in sync for backward compatibility. Migration shim: Automatically converts supporting_queries to citations. Prefers citations if both are present. """ # If citations is empty but supporting_queries has data, convert if not self.citations and self.supporting_queries: converted_citations = [ Citation( source="query", provider="snowflake", execution_id=q.execution_id, sql_sha256=q.sql_sha256, description=None, # Supporting queries don't have descriptions ) for q in self.supporting_queries ] # Use object.__setattr__ to bypass validate_assignment and prevent recursion object.__setattr__(self, "citations", converted_citations) # If supporting_queries is empty but citations has data, convert back # (for tools that still expect supporting_queries) if not self.supporting_queries and self.citations: # Only convert query-type citations back to DatasetSource converted_queries = [ DatasetSource( execution_id=cit.execution_id, sql_sha256=cit.sql_sha256, cache_manifest=cit.cache_manifest, ) for cit in self.citations if cit.source == "query" and (cit.execution_id or cit.sql_sha256) ] # Use object.__setattr__ to bypass validate_assignment and prevent recursion object.__setattr__(self, "supporting_queries", converted_queries) # Align updated_at with created_at if missing or empty if not self.updated_at: object.__setattr__(self, "updated_at", self.created_at) return self class Section(BaseModel): """A section within a report containing ordered insights. Sections provide logical grouping and ordering of insights within a report. """ model_config = ConfigDict(extra="forbid", validate_assignment=True) section_id: str = Field( ..., description="Stable UUID for this section", min_length=1, ) title: str = Field( ..., description="Human-readable section title", min_length=1, ) order: int = Field( ..., description="Display order (lower numbers appear first)", ge=0, ) insight_ids: list[str] = Field( default_factory=list, description="Ordered list of insight IDs in this section", ) notes: str | None = Field( default=None, description="Optional human notes or prose for this section", ) content: str | None = Field( default=None, description="Optional prose content for this section (e.g., markdown)", ) content_format: str | None = Field( default="markdown", description="Format for content field (markdown, html, plain)", pattern="^(markdown|html|plain)$", ) created_at: str = Field( default_factory=_now_iso, description="ISO 8601 timestamp when section was created", ) updated_at: str = Field( default_factory=_now_iso, description="ISO 8601 timestamp when section was last updated", ) metadata: dict[str, Any] = Field( default_factory=dict, description="Additional section metadata (category tags, etc.)", ) @field_validator("section_id") @classmethod def _validate_section_id(cls, v: str) -> str: """Validate section_id is a valid UUID string.""" try: uuid.UUID(v) except ValueError as e: raise ValueError(f"section_id must be valid UUID string: {v}") from e return v @model_validator(mode="after") def _default_updated_at(self) -> Section: """Ensure updated_at is always populated.""" if not self.updated_at: object.__setattr__(self, "updated_at", self.created_at) return self class Outline(BaseModel): """Machine-truth representation of a complete report. The outline contains all structural information about a report, including metadata, sections, and insights. This is the source of truth that drives report generation and evolution. """ model_config = ConfigDict(extra="ignore", validate_assignment=True) report_id: str = Field( ..., description="Stable report identifier", min_length=1, ) title: str = Field( ..., description="Human-readable report title", min_length=1, ) created_at: str = Field( ..., description="ISO 8601 timestamp when report was created", ) updated_at: str = Field( ..., description="ISO 8601 timestamp when report was last updated", ) version: str = Field( "1.0", description="Schema version for forward compatibility", ) outline_version: int = Field( 1, description="Monotonic version counter for optimistic locking", ge=1, ) sections: list[Section] = Field( default_factory=list, description="Ordered list of report sections", ) insights: list[Insight] = Field( default_factory=list, description="All insights referenced by sections", ) metadata: dict[str, Any] = Field( default_factory=dict, description="Additional report metadata (tags, owner, etc.)", ) @field_validator("report_id") @classmethod def _validate_report_id(cls, v: str) -> str: """Validate report_id is a valid ReportId string.""" try: ReportId(v) except ValueError as e: raise ValueError(f"report_id must be valid ReportId: {v}") from e return v def get_insight(self, insight_id: str) -> Insight: """Get insight by ID. Args: insight_id: UUID string of the insight Returns: The insight object Raises: ValueError: If insight not found """ for insight in self.insights: if insight.insight_id == insight_id: return insight raise ValueError(f"Insight not found: {insight_id}") def get_section(self, section_id: str) -> Section: """Get section by ID. Args: section_id: UUID string of the section Returns: The section object Raises: ValueError: If section not found """ for section in self.sections: if section.section_id == section_id: return section raise ValueError(f"Section not found: {section_id}") class AuditEvent(BaseModel): """Immutable audit event for report operations. Audit events provide complete traceability of all changes to reports, enabling revert operations and compliance auditing. """ model_config = ConfigDict(extra="forbid") action_id: str = Field( ..., description="Unique identifier for this audit event", min_length=1, ) report_id: str = Field( ..., description="Report identifier this event relates to", min_length=1, ) ts: str = Field( ..., description="ISO 8601 timestamp when action occurred", ) actor: str = Field( ..., description="Who performed the action (cli, agent, human)", pattern="^(cli|agent|human)$", ) action_type: str = Field( ..., description="Type of action performed", pattern=r"^(create|evolve|revert|rename|tag_update|render|manual_edit_detected|backup|status_change|fork|synthesize|archive|delete)$", ) request_id: str | None = Field( default=None, description="Optional correlation ID for request tracing", ) payload: dict[str, Any] = Field( default_factory=dict, description="Action-specific data and metadata", ) @field_validator("action_id") @classmethod def _validate_action_id(cls, v: str) -> str: """Validate action_id is a valid UUID string.""" try: uuid.UUID(v) except ValueError as e: raise ValueError(f"action_id must be valid UUID string: {v}") from e return v @field_validator("report_id") @classmethod def _validate_report_id(cls, v: str) -> str: """Validate report_id is a valid ReportId string.""" try: ReportId(v) except ValueError as e: raise ValueError(f"report_id must be valid ReportId: {v}") from e return v class IndexEntry(BaseModel): """Entry in the global reports index. Index entries provide fast lookup of reports by title or ID, and track basic metadata for listing operations. """ model_config = ConfigDict(extra="forbid") report_id: str = Field( ..., description="Report identifier", min_length=1, ) current_title: str = Field( ..., description="Current human-readable title", min_length=1, ) created_at: str = Field( ..., description="ISO 8601 timestamp when report was created", ) updated_at: str = Field( ..., description="ISO 8601 timestamp when report was last updated", ) tags: list[str] = Field( default_factory=list, description="User-defined tags for organization", ) status: str = Field( "active", description="Report status: active, archived, or deleted", pattern="^(active|archived|deleted)$", ) path: str = Field( ..., description="Relative path to report directory from reports root", ) @field_validator("report_id") @classmethod def _validate_report_id(cls, v: str) -> str: """Validate report_id is a valid ReportId string.""" try: ReportId(v) except ValueError as e: raise ValueError(f"report_id must be valid ReportId: {v}") from e return v __all__ = [ "AuditEvent", "Citation", "DatasetSource", "IndexEntry", "Insight", "Outline", "ReportId", "Section", ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server