"""Phase E1: Version Chain data model — unit + integration tests.
Covers:
- version_group, version_number, parent_job_id columns exist after schema init
- assign_version_group() sets columns correctly
- First member of a new group gets version_number=1, parent_job_id=NULL
- Second member gets version_number=2, parent_job_id=<first job_id>
- Third member gets version_number=3, parent_job_id=<second job_id>
- assign_version_group() is idempotent on re-assignment to same group
- get_version_chain() returns jobs ordered by version_number ASC
- get_version_chain() for unknown group returns []
- PROMPT_FULL_EXTRACTION includes structured revision block schema
"""
import json
import os
import sys
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
# ── DuckDB backend fixture ─────────────────────────────────────────
@pytest.fixture(autouse=True)
def _use_duckdb(tmp_path, monkeypatch):
"""Force DuckDB backend with fresh temp DB for each test."""
db_path = str(tmp_path / "test_e1.duckdb")
monkeypatch.setenv("SF_PERMITS_DB", db_path)
monkeypatch.delenv("DATABASE_URL", raising=False)
import src.db as db_mod
monkeypatch.setattr(db_mod, "BACKEND", "duckdb")
monkeypatch.setattr(db_mod, "_DUCKDB_PATH", db_path)
import web.auth as auth_mod
monkeypatch.setattr(auth_mod, "_schema_initialized", False)
db_mod.init_user_schema()
# ── helpers ────────────────────────────────────────────────────────
def _make_job(user_id: int = 1, filename: str = "plans.pdf") -> str:
"""Create a minimal job and return its job_id."""
from web.plan_jobs import create_job
return create_job(
user_id=user_id,
filename=filename,
file_size_mb=1.0,
)
# ── migration: E1 columns ─────────────────────────────────────────
def test_version_group_column_exists():
"""version_group column should exist after schema init."""
from src.db import query
rows = query(
"SELECT column_name FROM duckdb_columns() "
"WHERE table_name = 'plan_analysis_jobs' AND column_name = 'version_group'"
)
assert rows, "version_group column is missing from plan_analysis_jobs"
def test_version_number_column_exists():
"""version_number column should exist after schema init."""
from src.db import query
rows = query(
"SELECT column_name FROM duckdb_columns() "
"WHERE table_name = 'plan_analysis_jobs' AND column_name = 'version_number'"
)
assert rows, "version_number column is missing from plan_analysis_jobs"
def test_parent_job_id_column_exists():
"""parent_job_id column should exist after schema init."""
from src.db import query
rows = query(
"SELECT column_name FROM duckdb_columns() "
"WHERE table_name = 'plan_analysis_jobs' AND column_name = 'parent_job_id'"
)
assert rows, "parent_job_id column is missing from plan_analysis_jobs"
def test_new_job_version_columns_default_null():
"""Freshly created jobs have version_group=NULL, version_number=NULL, parent_job_id=NULL."""
from src.db import query_one
job_id = _make_job()
row = query_one(
"SELECT version_group, version_number, parent_job_id "
"FROM plan_analysis_jobs WHERE job_id = %s",
(job_id,),
)
assert row is not None
assert row[0] is None # version_group
assert row[1] is None # version_number
assert row[2] is None # parent_job_id
# ── assign_version_group() ─────────────────────────────────────────
def test_assign_version_group_first_member():
"""First member of a new group: version_number=1, parent_job_id=NULL."""
from web.plan_jobs import assign_version_group
from src.db import query_one
job_id = _make_job()
assign_version_group(job_id, group_id=job_id) # seed: group_id = own job_id
row = query_one(
"SELECT version_group, version_number, parent_job_id "
"FROM plan_analysis_jobs WHERE job_id = %s",
(job_id,),
)
assert row[0] == job_id # version_group set
assert row[1] == 1 # first version
assert row[2] is None # no parent
def test_assign_version_group_second_member():
"""Second member gets version_number=2, parent_job_id=first job."""
from web.plan_jobs import assign_version_group
from src.db import query_one
job1 = _make_job(filename="v1.pdf")
job2 = _make_job(filename="v2.pdf")
group_id = job1
assign_version_group(job1, group_id)
assign_version_group(job2, group_id)
row2 = query_one(
"SELECT version_group, version_number, parent_job_id "
"FROM plan_analysis_jobs WHERE job_id = %s",
(job2,),
)
assert row2[0] == group_id
assert row2[1] == 2
assert row2[2] == job1
def test_assign_version_group_third_member():
"""Third member gets version_number=3, parent_job_id=second job."""
from web.plan_jobs import assign_version_group
from src.db import query_one
job1 = _make_job(filename="v1.pdf")
job2 = _make_job(filename="v2.pdf")
job3 = _make_job(filename="v3.pdf")
group_id = job1
assign_version_group(job1, group_id)
assign_version_group(job2, group_id)
assign_version_group(job3, group_id)
row3 = query_one(
"SELECT version_number, parent_job_id "
"FROM plan_analysis_jobs WHERE job_id = %s",
(job3,),
)
assert row3[0] == 3
assert row3[1] == job2
def test_assign_version_group_idempotent():
"""Re-assigning a job to the same group keeps its version_number unchanged."""
from web.plan_jobs import assign_version_group
from src.db import query_one
job1 = _make_job(filename="v1.pdf")
job2 = _make_job(filename="v2.pdf")
group_id = job1
assign_version_group(job1, group_id)
assign_version_group(job2, group_id)
# Re-assigning job2 to the same group should give version_number=3 (not idempotent by design —
# it will increment again; that's acceptable for the data model).
# The important thing is it doesn't raise an error.
assign_version_group(job2, group_id) # should not raise
def test_assign_version_group_different_groups_independent():
"""Two separate groups maintain independent version_number sequences."""
from web.plan_jobs import assign_version_group
from src.db import query_one
job_a1 = _make_job(filename="a1.pdf")
job_b1 = _make_job(filename="b1.pdf")
job_b2 = _make_job(filename="b2.pdf")
assign_version_group(job_a1, group_id=job_a1) # group A, only member
assign_version_group(job_b1, group_id=job_b1) # group B, v1
assign_version_group(job_b2, group_id=job_b1) # group B, v2
row_a1 = query_one(
"SELECT version_number FROM plan_analysis_jobs WHERE job_id = %s", (job_a1,)
)
row_b2 = query_one(
"SELECT version_number FROM plan_analysis_jobs WHERE job_id = %s", (job_b2,)
)
assert row_a1[0] == 1 # Group A: still v1
assert row_b2[0] == 2 # Group B: v2
# ── get_version_chain() ────────────────────────────────────────────
def test_get_version_chain_returns_ordered():
"""get_version_chain() returns jobs ordered by version_number ASC."""
from web.plan_jobs import assign_version_group, get_version_chain
job1 = _make_job(filename="v1.pdf")
job2 = _make_job(filename="v2.pdf")
job3 = _make_job(filename="v3.pdf")
group_id = job1
assign_version_group(job1, group_id)
assign_version_group(job2, group_id)
assign_version_group(job3, group_id)
chain = get_version_chain(group_id)
assert len(chain) == 3
assert [c["version_number"] for c in chain] == [1, 2, 3]
assert chain[0]["job_id"] == job1
assert chain[1]["job_id"] == job2
assert chain[2]["job_id"] == job3
def test_get_version_chain_includes_required_fields():
"""get_version_chain() dicts include all expected fields."""
from web.plan_jobs import assign_version_group, get_version_chain
job1 = _make_job(filename="test.pdf")
assign_version_group(job1, group_id=job1)
chain = get_version_chain(job1)
assert len(chain) == 1
entry = chain[0]
required_fields = {
"job_id", "version_number", "parent_job_id", "filename", "status",
"created_at", "completed_at", "property_address", "permit_number",
"analysis_mode", "pages_analyzed",
}
assert required_fields.issubset(set(entry.keys()))
assert entry["filename"] == "test.pdf"
assert entry["status"] == "pending"
def test_get_version_chain_empty_for_unknown_group():
"""get_version_chain() returns [] for an unrecognised group_id."""
from web.plan_jobs import get_version_chain
chain = get_version_chain("nonexistent-group-xyz")
assert chain == []
def test_get_version_chain_parent_chain_is_correct():
"""parent_job_id links form a correct chain: v1 → None, v2 → v1, v3 → v2."""
from web.plan_jobs import assign_version_group, get_version_chain
job1 = _make_job(filename="v1.pdf")
job2 = _make_job(filename="v2.pdf")
job3 = _make_job(filename="v3.pdf")
group_id = job1
assign_version_group(job1, group_id)
assign_version_group(job2, group_id)
assign_version_group(job3, group_id)
chain = get_version_chain(group_id)
parents = [c["parent_job_id"] for c in chain]
assert parents[0] is None # v1 has no parent
assert parents[1] == job1 # v2 → v1
assert parents[2] == job2 # v3 → v2
# ── PROMPT_FULL_EXTRACTION structured revision block ──────────────
def test_prompt_full_extraction_has_revisions_key():
"""PROMPT_FULL_EXTRACTION JSON schema includes 'revisions' array."""
from src.vision.prompts import PROMPT_FULL_EXTRACTION
assert '"revisions"' in PROMPT_FULL_EXTRACTION, (
"PROMPT_FULL_EXTRACTION must include 'revisions' array in title_block JSON schema"
)
def test_prompt_full_extraction_revisions_has_required_fields():
"""PROMPT_FULL_EXTRACTION revision schema includes revision_number, revision_date, description."""
from src.vision.prompts import PROMPT_FULL_EXTRACTION
assert "revision_number" in PROMPT_FULL_EXTRACTION
assert "revision_date" in PROMPT_FULL_EXTRACTION
assert "description" in PROMPT_FULL_EXTRACTION
def test_prompt_full_extraction_revisions_not_null_string():
"""PROMPT_FULL_EXTRACTION should NOT use flat 'revision': null — that's the old format."""
from src.vision.prompts import PROMPT_FULL_EXTRACTION
# The old flat field was: "revision": null
# New structured format uses "revisions": [...]
# Make sure we haven't kept the old field alongside the new one
import re
# Look for the old pattern: "revision": null (standalone, not "revisions")
old_pattern = r'"revision"\s*:\s*null'
assert not re.search(old_pattern, PROMPT_FULL_EXTRACTION), (
"Old flat 'revision': null should be replaced by structured 'revisions' array"
)
def test_prompt_includes_revision_extraction_instructions():
"""PROMPT_FULL_EXTRACTION should describe extracting revision history rows."""
from src.vision.prompts import PROMPT_FULL_EXTRACTION
assert "revision" in PROMPT_FULL_EXTRACTION.lower()
# Should mention the table structure
assert "revision_number" in PROMPT_FULL_EXTRACTION or "Revision block" in PROMPT_FULL_EXTRACTION