"""Bill amendment detection and comparison logic.
This module compares different versions of a bill to detect amendments:
- Fetches all available versions of a bill from Parliament.ca XML
- Compares section text between versions
- Stores text history with version tags in Neo4j
Amendment Detection Strategy:
1. Fetch LEGISinfo JSON to get list of publications (versions)
2. For bills with publication_type 80702 ("As amended by committee"), fetch both versions
3. Compare section-by-section to identify changes
4. Store historical text in text_history_en/text_history_fr JSON fields
Example text_history format:
[
{"version": 1, "stage": "first-reading", "text": "original text...", "date": "2024-01-15"},
{"version": 3, "stage": "committee", "text": "amended text...", "date": "2024-03-20"}
]
"""
import json
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# Add fedmcp package to path
FEDMCP_PATH = Path(__file__).parent.parent.parent.parent / "fedmcp" / "src"
sys.path.insert(0, str(FEDMCP_PATH))
from fedmcp.clients.bill_text_xml import (
BillTextXMLClient,
BillVersion,
BillStage,
ParsedBill,
BillSection,
BillSubsection,
BillParagraph,
)
from ..utils.neo4j_client import Neo4jClient
from ..utils.progress import logger
@dataclass
class TextHistoryEntry:
"""A single entry in the text history for a section."""
version: int
stage: str
text: str
date: Optional[str] = None
@dataclass
class SectionDiff:
"""Represents a diff between two versions of a section."""
section_id: str
anchor_id: str
old_text: Optional[str]
new_text: Optional[str]
old_version: int
new_version: int
change_type: str # "added", "removed", "modified", "unchanged"
def normalize_text_for_comparison(text: Optional[str]) -> str:
"""Normalize text for comparison to avoid false positives.
Handles:
- Unicode normalization (NFC form)
- Collapsing multiple whitespace to single space
- Normalizing dashes (em-dash, en-dash → hyphen)
- Normalizing quotes (curly → straight)
- Stripping leading/trailing whitespace
"""
import re
import unicodedata
if not text:
return ""
# Unicode normalize to NFC (composed form)
normalized = unicodedata.normalize("NFC", text)
# Normalize various dash characters to regular hyphen
normalized = normalized.replace("\u2014", "-") # em-dash
normalized = normalized.replace("\u2013", "-") # en-dash
normalized = normalized.replace("\u2012", "-") # figure dash
normalized = normalized.replace("\u2010", "-") # hyphen
# Normalize quotes
normalized = normalized.replace("\u201c", '"') # left double quote
normalized = normalized.replace("\u201d", '"') # right double quote
normalized = normalized.replace("\u2018", "'") # left single quote
normalized = normalized.replace("\u2019", "'") # right single quote
# Normalize various space characters to regular space
normalized = normalized.replace("\u00a0", " ") # non-breaking space
normalized = normalized.replace("\u2009", " ") # thin space
normalized = normalized.replace("\u200b", "") # zero-width space
# Collapse multiple whitespace to single space
normalized = re.sub(r"\s+", " ", normalized)
# Strip leading/trailing whitespace
return normalized.strip()
def compare_section_text(old_text: Optional[str], new_text: Optional[str]) -> str:
"""Compare two text values and return change type."""
old_norm = normalize_text_for_comparison(old_text)
new_norm = normalize_text_for_comparison(new_text)
if not old_norm and not new_norm:
return "unchanged"
if not old_norm and new_norm:
return "added"
if old_norm and not new_norm:
return "removed"
if old_norm != new_norm:
return "modified"
return "unchanged"
def build_section_map(bill: ParsedBill) -> Dict[str, Tuple[str, Optional[str]]]:
"""Build a map of section anchor_id -> (section_id, text_en).
Returns a dict mapping anchor IDs to (id, text) tuples for comparison.
"""
section_map: Dict[str, Tuple[str, Optional[str]]] = {}
# Sections in parts
for part in bill.parts:
for section in part.sections:
section_map[section.anchor_id] = (section.id, section.text_en)
# Add subsections
for subsection in section.subsections:
section_map[subsection.anchor_id] = (subsection.id, subsection.text_en)
# Add paragraphs
for paragraph in subsection.paragraphs:
section_map[paragraph.anchor_id] = (paragraph.id, paragraph.text_en)
# Add subparagraphs
for subpara in paragraph.subparagraphs:
section_map[subpara.anchor_id] = (subpara.id, subpara.text_en)
# Loose sections
for section in bill.sections:
section_map[section.anchor_id] = (section.id, section.text_en)
for subsection in section.subsections:
section_map[subsection.anchor_id] = (subsection.id, subsection.text_en)
for paragraph in subsection.paragraphs:
section_map[paragraph.anchor_id] = (paragraph.id, paragraph.text_en)
for subpara in paragraph.subparagraphs:
section_map[subpara.anchor_id] = (subpara.id, subpara.text_en)
return section_map
def compare_bill_versions(
old_bill: ParsedBill,
new_bill: ParsedBill,
) -> List[SectionDiff]:
"""Compare two versions of a bill and return list of diffs.
Args:
old_bill: Earlier version of the bill (e.g., first reading)
new_bill: Later version of the bill (e.g., committee amended)
Returns:
List of SectionDiff objects describing changes
"""
diffs: List[SectionDiff] = []
old_sections = build_section_map(old_bill)
new_sections = build_section_map(new_bill)
# Find all unique anchor IDs
all_anchors = set(old_sections.keys()) | set(new_sections.keys())
for anchor_id in all_anchors:
old_entry = old_sections.get(anchor_id)
new_entry = new_sections.get(anchor_id)
old_text = old_entry[1] if old_entry else None
new_text = new_entry[1] if new_entry else None
section_id = new_entry[0] if new_entry else (old_entry[0] if old_entry else anchor_id)
change_type = compare_section_text(old_text, new_text)
if change_type != "unchanged":
diffs.append(SectionDiff(
section_id=section_id,
anchor_id=anchor_id,
old_text=old_text,
new_text=new_text,
old_version=old_bill.version_number,
new_version=new_bill.version_number,
change_type=change_type,
))
return diffs
def fetch_bill_versions(
client: BillTextXMLClient,
parliament: int,
session: int,
bill_number: str,
is_government: bool = False,
) -> List[Tuple[int, ParsedBill]]:
"""Fetch all available versions of a bill.
Returns list of (version_number, ParsedBill) tuples.
"""
versions: List[Tuple[int, ParsedBill]] = []
# Try versions 1-5
for v in range(1, 6):
try:
bill = client.parse_bill(
parliament=parliament,
session=session,
bill_number=bill_number,
version=v,
is_government=is_government,
)
versions.append((v, bill))
logger.info(f" Fetched version {v} of {bill_number}")
except Exception as e:
# Version doesn't exist, stop trying
if "404" in str(e) or "not found" in str(e).lower():
break
# Other error, log and continue
logger.debug(f" Could not fetch version {v}: {e}")
return versions
def detect_bill_amendments(
neo4j_client: Neo4jClient,
parliament: int,
session: int,
bill_number: str,
is_government: bool = False,
is_royal_assent: bool = False,
) -> Dict[str, Any]:
"""Detect amendments in a bill by comparing all available versions.
This is the main entry point for amendment detection.
Args:
neo4j_client: Neo4j client for storing results
parliament: Parliament number (e.g., 45)
session: Session number (e.g., 1)
bill_number: Bill code (e.g., "C-234")
is_government: True for government bills
is_royal_assent: True if bill is at Royal Assent (marks as final check)
Returns:
Dictionary with detection results
"""
session_str = f"{parliament}-{session}"
bill_id = f"{session_str}:{bill_number.upper()}"
logger.info(f"Detecting amendments for {bill_id}...")
client = BillTextXMLClient()
# Fetch all versions
versions = fetch_bill_versions(
client, parliament, session, bill_number, is_government
)
if len(versions) < 2:
logger.info(f" Only {len(versions)} version(s) found, no amendments to detect")
return {
"bill": bill_id,
"versions_found": len(versions),
"has_amendments": False,
"diffs": [],
}
# Compare consecutive versions
all_diffs: List[SectionDiff] = []
for i in range(len(versions) - 1):
old_version_num, old_bill = versions[i]
new_version_num, new_bill = versions[i + 1]
logger.info(f" Comparing version {old_version_num} -> {new_version_num}")
diffs = compare_bill_versions(old_bill, new_bill)
if diffs:
logger.info(f" Found {len(diffs)} changes")
all_diffs.extend(diffs)
# Update Bill node with amendment info
has_amendments = len(all_diffs) > 0
latest_version = versions[-1][0]
latest_stage = versions[-1][1].stage.value if versions[-1][1].stage else "first-reading"
_update_bill_amendment_status(
neo4j_client,
session_str,
bill_number.upper(),
has_amendments=has_amendments,
total_versions=len(versions),
latest_version_stage=latest_stage,
is_royal_assent=is_royal_assent,
)
# Store text history on sections
if has_amendments:
_store_section_histories(
neo4j_client,
versions,
all_diffs,
)
return {
"bill": bill_id,
"versions_found": len(versions),
"has_amendments": has_amendments,
"total_diffs": len(all_diffs),
"diffs_by_type": _count_diffs_by_type(all_diffs),
}
def _count_diffs_by_type(diffs: List[SectionDiff]) -> Dict[str, int]:
"""Count diffs by change type."""
counts: Dict[str, int] = {}
for diff in diffs:
counts[diff.change_type] = counts.get(diff.change_type, 0) + 1
return counts
def _update_bill_amendment_status(
neo4j_client: Neo4jClient,
session: str,
bill_number: str,
has_amendments: bool,
total_versions: int,
latest_version_stage: str,
is_royal_assent: bool = False,
) -> None:
"""Update Bill node with amendment tracking fields.
Sets:
- has_amendments: Whether any amendments were detected
- total_versions: Number of bill versions found (1=first reading, 2=committee, etc.)
- latest_version_stage: Stage of the latest version (first-reading, committee, etc.)
- last_amendment_check: Timestamp of when this check was performed (for re-check scheduling)
- checked_at_royal_assent: True if this bill was checked while at Royal Assent (final check)
"""
# Build SET clause dynamically based on whether this is a Royal Assent bill
if is_royal_assent:
cypher = """
MATCH (b:Bill {session: $session, number: $number})
SET b.has_amendments = $has_amendments,
b.total_versions = $total_versions,
b.latest_version_stage = $latest_version_stage,
b.last_amendment_check = datetime(),
b.checked_at_royal_assent = true
RETURN b.number as updated
"""
else:
cypher = """
MATCH (b:Bill {session: $session, number: $number})
SET b.has_amendments = $has_amendments,
b.total_versions = $total_versions,
b.latest_version_stage = $latest_version_stage,
b.last_amendment_check = datetime()
RETURN b.number as updated
"""
neo4j_client.run_query(cypher, {
"session": session,
"number": bill_number,
"has_amendments": has_amendments,
"total_versions": total_versions,
"latest_version_stage": latest_version_stage,
})
status_note = " (final check at Royal Assent)" if is_royal_assent else ""
logger.info(f" Updated Bill node: has_amendments={has_amendments}, versions={total_versions}{status_note}")
def _store_section_histories(
neo4j_client: Neo4jClient,
versions: List[Tuple[int, ParsedBill]],
diffs: List[SectionDiff],
) -> None:
"""Store text history on sections that were amended.
For each section in diffs, builds the text history across all versions
but ONLY includes versions where the text actually changed.
Also updates the main text_en field with the latest version's text.
History filtering logic:
- Always include the first version where the section appears
- Only include subsequent versions if text differs from previous version
- This avoids showing redundant identical versions in the UI
"""
# Build a map of anchor_id -> list of (version_num, stage, text)
# We'll collect all version texts first, then filter
raw_history_map: Dict[str, List[Tuple[int, str, str]]] = {}
for diff in diffs:
if diff.anchor_id not in raw_history_map:
raw_history_map[diff.anchor_id] = []
# Populate history from all versions
for version_num, bill in versions:
section_map = build_section_map(bill)
stage = bill.stage.value if bill.stage else "first-reading"
for anchor_id, (section_id, text) in section_map.items():
if anchor_id in raw_history_map:
raw_history_map[anchor_id].append((version_num, stage, text or ""))
# Sort by version and filter to only include versions where text changed
history_map: Dict[str, List[TextHistoryEntry]] = {}
latest_text_map: Dict[str, str] = {} # Track latest text for updating text_en
for anchor_id, raw_entries in raw_history_map.items():
# Sort by version number
raw_entries.sort(key=lambda e: e[0])
# Filter to only include versions where text actually changed
filtered_entries: List[TextHistoryEntry] = []
prev_normalized_text: Optional[str] = None
for version_num, stage, text in raw_entries:
normalized_text = normalize_text_for_comparison(text)
# Include if this is the first entry OR text differs from previous
if prev_normalized_text is None or normalized_text != prev_normalized_text:
filtered_entries.append(TextHistoryEntry(
version=version_num,
stage=stage,
text=text,
))
prev_normalized_text = normalized_text
# Only keep sections that have at least 2 distinct versions (actual changes)
if len(filtered_entries) >= 2:
history_map[anchor_id] = filtered_entries
# Track the latest text for updating the main text_en field
if raw_entries:
latest_text_map[anchor_id] = raw_entries[-1][2] # text from latest version
# Store in Neo4j
updates: List[Dict[str, Any]] = []
for anchor_id, history in history_map.items():
# Convert to JSON
history_json = json.dumps([
{"version": e.version, "stage": e.stage, "text": e.text, "date": e.date}
for e in history
])
# Find the latest version number
latest_version = max(e.version for e in history)
updates.append({
"anchor_id": anchor_id,
"text_history_en": history_json,
"has_amendments": True,
"current_version": latest_version,
"latest_text_en": latest_text_map.get(anchor_id, ""),
})
if not updates:
return
# Update BillSection nodes - also update text_en with latest version's text
cypher_section = """
UNWIND $updates AS u
MATCH (bs:BillSection {anchor_id: u.anchor_id})
SET bs.text_history_en = u.text_history_en,
bs.has_amendments = u.has_amendments,
bs.current_version = u.current_version,
bs.text_en = u.latest_text_en
RETURN count(bs) as updated
"""
result = neo4j_client.run_query(cypher_section, {"updates": updates})
section_count = result[0]["updated"] if result else 0
# Update BillSubsection nodes - also update text_en with latest version's text
cypher_subsection = """
UNWIND $updates AS u
MATCH (bss:BillSubsection {anchor_id: u.anchor_id})
SET bss.text_history_en = u.text_history_en,
bss.has_amendments = u.has_amendments,
bss.current_version = u.current_version,
bss.text_en = u.latest_text_en
RETURN count(bss) as updated
"""
result = neo4j_client.run_query(cypher_subsection, {"updates": updates})
subsection_count = result[0]["updated"] if result else 0
# Update BillParagraph nodes - also update text_en with latest version's text
cypher_paragraph = """
UNWIND $updates AS u
MATCH (bp:BillParagraph {anchor_id: u.anchor_id})
SET bp.text_history_en = u.text_history_en,
bp.has_amendments = u.has_amendments,
bp.current_version = u.current_version,
bp.text_en = u.latest_text_en
RETURN count(bp) as updated
"""
result = neo4j_client.run_query(cypher_paragraph, {"updates": updates})
paragraph_count = result[0]["updated"] if result else 0
# Update BillSubparagraph nodes - also update text_en with latest version's text
cypher_subparagraph = """
UNWIND $updates AS u
MATCH (bsp:BillSubparagraph {anchor_id: u.anchor_id})
SET bsp.text_history_en = u.text_history_en,
bsp.has_amendments = u.has_amendments,
bsp.current_version = u.current_version,
bsp.text_en = u.latest_text_en
RETURN count(bsp) as updated
"""
result = neo4j_client.run_query(cypher_subparagraph, {"updates": updates})
subparagraph_count = result[0]["updated"] if result else 0
total = section_count + subsection_count + paragraph_count + subparagraph_count
logger.info(f" Stored text history on {total} nodes")
def detect_amendments_for_session(
neo4j_client: Neo4jClient,
parliament: int,
session: int,
*,
limit: Optional[int] = None,
) -> Dict[str, Any]:
"""Detect amendments for all bills in a session.
Args:
neo4j_client: Neo4j client instance
parliament: Parliament number
session: Session number
limit: Optional limit on number of bills to process
Returns:
Summary statistics
"""
session_str = f"{parliament}-{session}"
logger.info(f"Detecting amendments for session {session_str}...")
# Get bills that might have amendments (have passed 2nd reading or committee)
query = """
MATCH (b:Bill {session: $session})
WHERE b.passed_house_second_reading IS NOT NULL
OR b.status CONTAINS 'committee'
OR b.status CONTAINS 'Third'
RETURN b.number as number, b.is_government_bill as is_gov
ORDER BY b.number
"""
if limit:
query += f" LIMIT {limit}"
results = neo4j_client.run_query(query, {"session": session_str})
if not results:
logger.info(f"No bills found in {session_str} that might have amendments")
return {"session": session_str, "bills_processed": 0, "bills_with_amendments": 0}
logger.info(f"Found {len(results)} bills to check for amendments")
stats = {
"session": session_str,
"bills_processed": 0,
"bills_with_amendments": 0,
"total_diffs": 0,
}
for row in results:
bill_number = row["number"]
is_gov = row.get("is_gov") or False
try:
result = detect_bill_amendments(
neo4j_client,
parliament=parliament,
session=session,
bill_number=bill_number,
is_government=is_gov,
)
stats["bills_processed"] += 1
if result.get("has_amendments"):
stats["bills_with_amendments"] += 1
stats["total_diffs"] += result.get("total_diffs", 0)
except Exception as e:
logger.error(f"Failed to detect amendments for {bill_number}: {e}")
logger.info(f"Amendment detection complete: {stats['bills_with_amendments']}/{stats['bills_processed']} bills amended")
return stats