#!/usr/bin/env python3
"""
Comprehensive bill sync and text backfill script.
This script:
1. Updates all bill metadata from LEGISinfo (status, dates, new bills)
2. Finds all bills without full_text_en
3. Ingests bill structure and text for each one
Usage:
python scripts/backfill_bill_text.py [--limit N] [--session 45-1] [--text-only]
Options:
--limit N Limit number of bills to process for text backfill
--session X Only process bills from this session (e.g., 45-1)
--text-only Skip LEGISinfo update, only backfill text
--dry-run Show what would be processed without actually doing it
"""
import sys
import argparse
from pathlib import Path
# Add packages to path
SCRIPT_DIR = Path(__file__).parent
PIPELINE_DIR = SCRIPT_DIR.parent
sys.path.insert(0, str(PIPELINE_DIR))
sys.path.insert(0, str(PIPELINE_DIR.parent / "fedmcp" / "src"))
from fedmcp_pipeline.utils.neo4j_client import Neo4jClient
from fedmcp_pipeline.utils.config import Config
from fedmcp_pipeline.utils.progress import logger
def update_bills_from_legisinfo(neo4j: Neo4jClient) -> dict:
"""
Update all bills from LEGISinfo JSON bulk export.
This updates:
- Status changes (House -> Committee -> Senate -> Royal Assent)
- Stage progression dates
- Bill type, originating chamber, sponsor info
- Creates any new bills that don't exist yet
"""
logger.info("=" * 60)
logger.info("STEP 1: Updating bills from LEGISinfo...")
logger.info("=" * 60)
from fedmcp_pipeline.ingest.parliament import ingest_bills_from_legisinfo_json
result = ingest_bills_from_legisinfo_json(neo4j, batch_size=10000)
count = result.get("count", 0)
new_bills = result.get("new_bills", [])
logger.success(f"✅ Updated {count} bills from LEGISinfo")
if new_bills:
logger.info(f" New bills created: {len(new_bills)}")
for bill in new_bills:
logger.info(f" - {bill['number']} ({bill['session']})")
return result
def get_bills_without_text(neo4j: Neo4jClient, session: str = None, limit: int = None) -> list:
"""Get all bills that don't have full text."""
query = """
MATCH (b:Bill)
WHERE b.full_text_en IS NULL OR b.full_text_en = ''
"""
if session:
query += f" AND b.session = '{session}'"
query += """
RETURN b.number as number,
b.session as session,
b.is_government_bill as is_government_bill,
b.bill_type as bill_type
ORDER BY b.session DESC, b.number
"""
if limit:
query += f" LIMIT {limit}"
return list(neo4j.run_query(query))
def get_total_bill_count(neo4j: Neo4jClient) -> dict:
"""Get bill counts for summary."""
result = neo4j.run_query("""
MATCH (b:Bill)
RETURN
count(b) as total,
count(CASE WHEN b.full_text_en IS NOT NULL AND b.full_text_en <> '' THEN 1 END) as with_text,
count(CASE WHEN b.full_text_en IS NULL OR b.full_text_en = '' THEN 1 END) as without_text
""")
row = list(result)[0]
return {
"total": row["total"],
"with_text": row["with_text"],
"without_text": row["without_text"]
}
def parse_session(session_str: str) -> tuple:
"""Parse session string like '45-1' into (parliament, session_number)."""
parts = session_str.split('-')
return int(parts[0]), int(parts[1])
def backfill_bill_text(neo4j: Neo4jClient, bills: list) -> dict:
"""Ingest text for a list of bills."""
from fedmcp_pipeline.ingest.bill_structure import ingest_bill_structure
stats = {
"total": len(bills),
"success": 0,
"failed": 0,
"errors": []
}
for i, bill in enumerate(bills, 1):
number = bill["number"]
session = bill["session"]
is_gov = bill.get("is_government_bill", False)
# Parse parliament/session from session string
try:
parliament, session_num = parse_session(session)
except (ValueError, IndexError):
logger.warning(f" [{i}/{len(bills)}] {number}: Invalid session format '{session}'")
stats["failed"] += 1
stats["errors"].append(f"{number}: Invalid session '{session}'")
continue
logger.info(f" [{i}/{len(bills)}] Ingesting text for {number} ({session})...")
try:
result = ingest_bill_structure(
neo4j,
parliament=parliament,
session=session_num,
bill_number=number,
version=1,
is_government=is_gov,
include_all_versions=False,
include_full_text=True,
)
if result.get("error"):
logger.warning(f" {number}: {result['error']}")
stats["failed"] += 1
stats["errors"].append(f"{number}: {result['error']}")
else:
parts = result.get("parts", 0)
sections = result.get("sections", 0)
has_en = result.get("full_text_en_extracted", False)
has_fr = result.get("full_text_fr_extracted", False)
logger.info(f" ✅ {number}: {parts} parts, {sections} sections, EN={has_en}, FR={has_fr}")
stats["success"] += 1
except Exception as e:
logger.error(f" {number}: Exception - {e}")
stats["failed"] += 1
stats["errors"].append(f"{number}: {str(e)}")
return stats
def main():
parser = argparse.ArgumentParser(description="Sync bills from LEGISinfo and backfill text")
parser.add_argument("--limit", type=int, help="Limit number of bills to process for text backfill")
parser.add_argument("--session", type=str, help="Only process bills from this session (e.g., 45-1)")
parser.add_argument("--text-only", action="store_true", help="Skip LEGISinfo update, only backfill text")
parser.add_argument("--dry-run", action="store_true", help="Show what would be processed without actually doing it")
args = parser.parse_args()
config = Config()
neo4j = Neo4jClient(config.neo4j_uri, config.neo4j_user, config.neo4j_password)
try:
# Show initial state
initial_counts = get_total_bill_count(neo4j)
logger.info("=" * 60)
logger.info("INITIAL STATE")
logger.info(f"Total bills: {initial_counts['total']}")
logger.info(f"With text: {initial_counts['with_text']}")
logger.info(f"Without text: {initial_counts['without_text']}")
logger.info("=" * 60)
# Step 1: Update from LEGISinfo (unless --text-only)
new_bills = []
if not args.text_only:
if args.dry_run:
logger.info("DRY RUN - would update all bills from LEGISinfo")
else:
result = update_bills_from_legisinfo(neo4j)
new_bills = result.get("new_bills", [])
# Step 2: Find bills without text
logger.info("")
logger.info("=" * 60)
logger.info("STEP 2: Finding bills without text...")
logger.info("=" * 60)
bills = get_bills_without_text(neo4j, session=args.session, limit=args.limit)
if not bills:
logger.success("✅ All bills already have text!")
return 0
logger.info(f"Found {len(bills)} bills without text")
if args.dry_run:
logger.info("DRY RUN - would process these bills:")
for bill in bills[:20]: # Show first 20
logger.info(f" - {bill['number']} ({bill['session']})")
if len(bills) > 20:
logger.info(f" ... and {len(bills) - 20} more")
return 0
# Step 3: Backfill text
logger.info("")
logger.info("=" * 60)
logger.info("STEP 3: Backfilling bill text...")
logger.info("=" * 60)
stats = backfill_bill_text(neo4j, bills)
# Final summary
final_counts = get_total_bill_count(neo4j)
logger.info("")
logger.info("=" * 60)
logger.success("SYNC & BACKFILL COMPLETE")
logger.info("=" * 60)
logger.info("")
logger.info("LEGISinfo Update:")
if args.text_only:
logger.info(" (skipped with --text-only)")
else:
logger.info(f" Bills updated: {initial_counts['total']} -> {final_counts['total']}")
if new_bills:
logger.info(f" New bills: {len(new_bills)}")
logger.info("")
logger.info("Text Backfill:")
logger.info(f" Processed: {stats['total']}")
logger.info(f" Success: {stats['success']}")
logger.info(f" Failed: {stats['failed']}")
logger.info("")
logger.info("Final State:")
logger.info(f" Total bills: {final_counts['total']}")
logger.info(f" With text: {final_counts['with_text']}")
logger.info(f" Without text: {final_counts['without_text']}")
if stats["errors"]:
logger.info("")
logger.warning("Errors encountered:")
for error in stats["errors"][:10]:
logger.warning(f" - {error}")
if len(stats["errors"]) > 10:
logger.warning(f" ... and {len(stats['errors']) - 10} more")
logger.info("=" * 60)
return 0 if stats["failed"] == 0 else 1
finally:
neo4j.close()
if __name__ == "__main__":
exit(main())