#!/usr/bin/env python3
"""
Daily cross-reference extraction for Hansard statements.
This script is designed to run as a Cloud Run Job after the daily Hansard import.
It processes statements that don't have MENTIONS relationships yet and creates
links to Bills, MPs, Committees, Petitions, and Votes.
Schedule:
- Hansard import: 4:00 AM ET
- Cross-reference extraction: 5:00 AM ET (this script)
Usage:
# Default: Process unprocessed statements from the last 30 days
python scripts/run_cross_references.py
# Process all unprocessed statements
python scripts/run_cross_references.py --all-unprocessed
# Process statements from specific date range
python scripts/run_cross_references.py --from-date 2025-01-01 --to-date 2025-01-15
# Dry run (no writes)
python scripts/run_cross_references.py --dry-run
Environment Variables:
NEO4J_URI - Neo4j connection URI (default: bolt://localhost:7687)
NEO4J_USERNAME - Neo4j username (default: neo4j)
NEO4J_PASSWORD - Neo4j password (required in production)
"""
import argparse
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
# Add parent to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from fedmcp_pipeline.utils.neo4j_client import Neo4jClient
from fedmcp_pipeline.utils.progress import logger
# Import backfill functions
from scripts.backfill_cross_references import run_backfill
def main():
"""Main entry point for daily cross-reference extraction."""
parser = argparse.ArgumentParser(
description="Extract entity mentions from Hansard statements and create MENTIONS relationships",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument(
"--from-date",
type=str,
help="Start date (YYYY-MM-DD). Default: 30 days ago",
)
parser.add_argument(
"--to-date",
type=str,
help="End date (YYYY-MM-DD). Default: today",
)
parser.add_argument(
"--all-unprocessed",
action="store_true",
help="Process all unprocessed statements (no date filter)",
)
parser.add_argument(
"--batch-size",
type=int,
default=500,
help="Batch size for processing (default: 500)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Don't create relationships, just log what would be created",
)
args = parser.parse_args()
# Banner
logger.info("=" * 70)
logger.info("DAILY CROSS-REFERENCE EXTRACTION")
logger.info("=" * 70)
logger.info(f"Started: {datetime.now().isoformat()}")
# Determine date range
if args.all_unprocessed:
from_date = None
to_date = None
logger.info("Processing ALL unprocessed statements (no date filter)")
else:
# Default: last 30 days
to_date = args.to_date or datetime.now().strftime("%Y-%m-%d")
if args.from_date:
from_date = args.from_date
else:
from_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
logger.info(f"Date range: {from_date} to {to_date}")
if args.dry_run:
logger.info("DRY RUN MODE - no relationships will be created")
# Connect to Neo4j
neo4j_uri = os.getenv("NEO4J_URI", "bolt://localhost:7687")
neo4j_user = os.getenv("NEO4J_USERNAME", "neo4j")
neo4j_password = os.getenv("NEO4J_PASSWORD", "")
logger.info(f"Connecting to Neo4j at {neo4j_uri}...")
neo4j = Neo4jClient(
uri=neo4j_uri,
user=neo4j_user,
password=neo4j_password,
)
try:
neo4j.test_connection()
logger.info("Connected to Neo4j")
# Run the backfill with unprocessed-only flag
results = run_backfill(
neo4j,
from_date=from_date,
to_date=to_date,
unprocessed_only=True, # Always process only unprocessed statements
limit=None, # No limit
batch_size=args.batch_size,
dry_run=args.dry_run,
)
# Log completion
logger.info("")
logger.info("=" * 70)
logger.info("EXTRACTION COMPLETE")
logger.info("=" * 70)
logger.info(f"Finished: {datetime.now().isoformat()}")
# Return success if we processed any statements, or if there were none to process
if results.get("processed", 0) >= 0:
return 0
return 1
except Exception as e:
logger.error(f"Error: {e}")
import traceback
traceback.print_exc()
return 1
finally:
neo4j.close()
if __name__ == "__main__":
sys.exit(main())