#!/usr/bin/env python3
"""
Lightweight hourly update job for Cloud Run.
OPTIMIZED VERSION with:
- Batch MERGE operations instead of loops
- Parallel execution for independent operations
- Smart scheduling (only run debates when House sits)
- Debate ingestion support
Updates only critical data that changes frequently:
- MP party affiliations (e.g., floor-crossers like Chris d'Entrement)
- Cabinet positions
- New bills introduced
- Recent votes (last 24 hours)
- Recent debates (when available)
Designed to be fast (<30 seconds) and low-memory (<1GB).
"""
import os
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Dict, Any, List
from concurrent.futures import ThreadPoolExecutor
import logging
# Add packages to path
SCRIPT_DIR = Path(__file__).parent
PIPELINE_DIR = SCRIPT_DIR.parent
sys.path.insert(0, str(PIPELINE_DIR))
from fedmcp_pipeline.utils.neo4j_client import Neo4jClient
from fedmcp_pipeline.utils.config import Config
from fedmcp_pipeline.utils.progress import logger
# Add fedmcp clients
FEDMCP_PATH = PIPELINE_DIR.parent / "fedmcp" / "src"
sys.path.insert(0, str(FEDMCP_PATH))
from fedmcp.clients.openparliament import OpenParliamentClient
from fedmcp.http import RateLimitedSession
class LightweightUpdater:
"""Fast hourly updates for critical parliamentary data."""
def __init__(self, neo4j_client: Neo4jClient):
self.neo4j = neo4j_client
# Create session with longer timeout for pagination-heavy operations
# 90s timeout allows for fetching multiple pages without timing out
session = RateLimitedSession(
min_request_interval=0.1, # 10 req/s (respectful rate limiting)
default_timeout=90.0, # 90s timeout for long-running queries
max_attempts=5,
backoff_factor=1.0
)
self.op_client = OpenParliamentClient(session=session)
self.stats = {
"mps_updated": 0,
"party_changes": [],
"cabinet_changes": [],
"new_bills": 0,
"new_votes": 0,
"debates": 0,
}
def update_mp_parties(self) -> int:
"""
Update MP party affiliations using batch MERGE.
Returns count of MPs updated.
"""
logger.info("Checking for MP party changes...")
# Fetch all current MPs from OpenParliament
mps_list = list(self.op_client.list_mps())
logger.info(f"Fetched {len(mps_list)} current MPs from OpenParliament")
# Get current party affiliations from Neo4j for comparison
current_parties_query = """
MATCH (m:MP)
RETURN m.id as id, m.name as name, m.party as party
"""
current_parties = {row["id"]: row for row in self.neo4j.run_query(current_parties_query)}
# Collect all MP updates
mp_updates = []
for mp_data in mps_list:
mp_id = mp_data.get("url", "").split("/")[-2]
mp_name = mp_data.get("name")
# OpenParliament API returns short_name as either a string or dict {'en': 'Party Name'}
party_data = mp_data.get("current_party", {}).get("short_name")
new_party = party_data.get("en") if isinstance(party_data, dict) else party_data
if not mp_id or not new_party:
continue
# Check if party changed
old_record = current_parties.get(mp_id, {})
old_party = old_record.get("party")
if old_party and old_party != new_party:
logger.warning(f"π Party change detected: {mp_name} ({old_party} β {new_party})")
self.stats["party_changes"].append({
"mp_name": mp_name,
"old_party": old_party,
"new_party": new_party,
"timestamp": datetime.now(timezone.utc).isoformat()
})
# Add to batch update
mp_updates.append({
"id": mp_id,
"name": mp_name,
"party": new_party,
"updated_at": datetime.now(timezone.utc).isoformat()
})
# Batch MERGE operation with conservative batch size to avoid connection resets
if mp_updates:
self.neo4j.batch_merge_nodes("MP", mp_updates, merge_keys=["id"], batch_size=100)
logger.success(f"β
Updated {len(mp_updates)} MP records")
return len(mp_updates)
def update_cabinet_positions(self) -> int:
"""
Update cabinet positions using batch operations.
Returns count of cabinet ministers updated.
"""
logger.info("Checking for cabinet changes...")
# Get current cabinet positions from Neo4j
current_cabinet_query = """
MATCH (m:MP)
WHERE m.cabinet_position IS NOT NULL
RETURN m.id as id, m.name as name, m.cabinet_position as position
"""
current_cabinet = {row["id"]: row for row in self.neo4j.run_query(current_cabinet_query)}
# Fetch MPs with cabinet roles from OpenParliament
mps_list = list(self.op_client.list_mps())
# Collect updates
cabinet_updates = []
cabinet_removals = []
for mp_data in mps_list:
mp_id = mp_data.get("url", "").split("/")[-2]
mp_name = mp_data.get("name")
current_role = mp_data.get("current_role")
if not mp_id:
continue
# Extract cabinet position if exists
new_position = None
if current_role and "Minister" in current_role:
new_position = current_role
old_record = current_cabinet.get(mp_id, {})
old_position = old_record.get("position")
# Detect changes
if old_position != new_position:
if old_position and not new_position:
logger.warning(f"π Cabinet exit: {mp_name} (was {old_position})")
self.stats["cabinet_changes"].append({
"mp_name": mp_name,
"type": "exit",
"old_position": old_position,
"timestamp": datetime.now(timezone.utc).isoformat()
})
cabinet_removals.append(mp_id)
elif not old_position and new_position:
logger.warning(f"π New cabinet minister: {mp_name} β {new_position}")
self.stats["cabinet_changes"].append({
"mp_name": mp_name,
"type": "appointment",
"new_position": new_position,
"timestamp": datetime.now(timezone.utc).isoformat()
})
cabinet_updates.append({
"id": mp_id,
"cabinet_position": new_position,
"updated_at": datetime.now(timezone.utc).isoformat()
})
elif old_position and new_position:
logger.warning(f"π Cabinet shuffle: {mp_name} ({old_position} β {new_position})")
self.stats["cabinet_changes"].append({
"mp_name": mp_name,
"type": "shuffle",
"old_position": old_position,
"new_position": new_position,
"timestamp": datetime.now(timezone.utc).isoformat()
})
cabinet_updates.append({
"id": mp_id,
"cabinet_position": new_position,
"updated_at": datetime.now(timezone.utc).isoformat()
})
# Batch update cabinet positions
if cabinet_updates:
self.neo4j.batch_merge_nodes("MP", cabinet_updates, merge_keys=["id"], batch_size=100)
# Batch remove cabinet positions
if cabinet_removals:
remove_query = """
UNWIND $ids AS mp_id
MATCH (m:MP {id: mp_id})
REMOVE m.cabinet_position
SET m.updated_at = datetime()
"""
self.neo4j.run_query(remove_query, {"ids": cabinet_removals})
updated_count = len(cabinet_updates) + len(cabinet_removals)
logger.success(f"β
Updated {updated_count} cabinet positions")
return updated_count
def check_new_bills(self, since_hours: int = 24) -> int:
"""
Check for bills introduced in the last N hours.
Args:
since_hours: Look back this many hours
Returns count of new bills.
"""
logger.info(f"Checking for bills introduced in last {since_hours} hours...")
cutoff_date = (datetime.now(timezone.utc) - timedelta(hours=since_hours)).date().isoformat()
new_count = 0
# Get latest bills from OpenParliament
for bill in self.op_client.list_bills():
introduced_date = bill.get("introduced")
if not introduced_date or introduced_date < cutoff_date:
continue
# Check if bill already exists in Neo4j
bill_number = bill.get("number")
bill_session = bill.get("session")
check_query = """
MATCH (b:Bill {number: $number, session: $session})
RETURN b
"""
existing = self.neo4j.run_query(check_query, {
"number": bill_number,
"session": bill_session
})
if not existing:
# Create new bill node
create_query = """
MERGE (b:Bill {number: $number, session: $session})
SET b.name_en = $name_en,
b.name_fr = $name_fr,
b.introduced = $introduced,
b.sponsor_politician_id = $sponsor_id,
b.status_code = $status,
b.updated_at = datetime()
RETURN b
"""
self.neo4j.run_query(create_query, {
"number": bill_number,
"session": bill_session,
"name_en": bill.get("name", {}).get("en"),
"name_fr": bill.get("name", {}).get("fr"),
"introduced": introduced_date,
"sponsor_id": bill.get("sponsor_politician_url", "").split("/")[-2] if bill.get("sponsor_politician_url") else None,
"status": bill.get("status_code")
})
new_count += 1
logger.info(f"π New bill: {bill_number} - {bill.get('name', {}).get('en', 'Unknown')}")
logger.success(f"β
Found {new_count} new bills")
return new_count
def check_recent_votes(self, since_hours: int = 24) -> int:
"""
Check for votes in the last N hours.
Args:
since_hours: Look back this many hours
Returns count of new votes.
"""
logger.info(f"Checking for votes in last {since_hours} hours...")
cutoff_date = (datetime.now(timezone.utc) - timedelta(hours=since_hours)).date().isoformat()
new_count = 0
for vote in self.op_client.list_votes():
vote_date = vote.get("date")
if not vote_date or vote_date < cutoff_date:
continue
# Parse session to get parliament and session numbers (format: "44-1")
session_str = vote.get("session", "")
parliament_number = None
session_number = None
if "-" in session_str:
parts = session_str.split("-")
if len(parts) == 2:
try:
parliament_number = int(parts[0])
session_number = int(parts[1])
except ValueError:
pass
vote_number = vote.get("number")
if not vote_number:
continue
# Check if vote already exists using canonical vote_number field
check_query = """
MATCH (v:Vote {vote_number: $vote_number})
RETURN v
"""
existing = self.neo4j.run_query(check_query, {"vote_number": vote_number})
if not existing:
# Create new vote node with canonical property names
# Matching schema from votes_xml_import.py
create_query = """
MERGE (v:Vote {vote_number: $vote_number})
SET v.parliament_number = $parliament_number,
v.session_number = $session_number,
v.date_time = datetime($date_time),
v.result = $result,
v.num_yeas = $num_yeas,
v.num_nays = $num_nays,
v.num_paired = $num_paired,
v.updated_at = datetime()
RETURN v
"""
self.neo4j.run_query(create_query, {
"vote_number": vote_number,
"parliament_number": parliament_number,
"session_number": session_number,
"date_time": vote_date, # Will be converted to datetime
"result": vote.get("result"),
"num_yeas": vote.get("yeas"),
"num_nays": vote.get("nays"),
"num_paired": vote.get("paired")
})
new_count += 1
logger.info(f"π³οΈ New vote: #{vote_number} - {vote.get('result')}")
logger.success(f"β
Found {new_count} new votes")
return new_count
def should_run_debate_import(self) -> int:
"""
Check if new debates likely available (smart scheduling).
Returns:
Number of days to look back for imports (0 if should skip)
"""
from datetime import datetime, timedelta
# 1. Check if any debates published in last 48 hours
recent_cutoff = (datetime.now(timezone.utc) - timedelta(hours=48)).date()
try:
# Query OpenParliament API for recent debates
debates = list(self.op_client.list_debates(limit=10))
recent_debates = [d for d in debates if d.get("date", "") >= str(recent_cutoff)]
if recent_debates:
logger.info(f"Found {len(recent_debates)} debates in last 48h - importing")
return 2 # Look back 2 days
except Exception as e:
logger.warning(f"Error checking debate schedule: {e}")
# 2. Safety fallback - catch up on missed debates
# (Check Neo4j for last successful import timestamp)
query = """
MATCH (d:Document)
WHERE d.public = true
RETURN d.date as last_debate_date
ORDER BY d.date DESC
LIMIT 1
"""
result = self.neo4j.run_query(query)
if result and len(result) > 0:
last_date_str = result[0]["last_debate_date"]
last_date = datetime.fromisoformat(last_date_str).replace(tzinfo=timezone.utc)
days_since = (datetime.now(timezone.utc) - last_date).days
if days_since > 7:
# Look back enough days to catch up + buffer
lookback_days = min(days_since + 2, 30) # Cap at 30 days
logger.info(f"Last debate {days_since} days ago - running catch-up import ({lookback_days} days)")
return lookback_days
logger.info("No recent debates - skipping import")
return 0
def import_recent_debates(self, since_hours: int = 24) -> int:
"""
Import debates from last N hours via OpenParliament API.
Args:
since_hours: Import debates from last N hours
Returns count of debates imported.
"""
logger.info(f"Importing debates from last {since_hours} hours...")
try:
from fedmcp_pipeline.ingest.recent_import import RecentDataImporter
# Calculate cutoff date
cutoff = (datetime.now(timezone.utc) - timedelta(hours=since_hours)).isoformat()[:10] # YYYY-MM-DD
# Use existing importer (already optimized with MERGE) with our configured 90s timeout client
importer = RecentDataImporter(self.neo4j, start_date=cutoff, op_client=self.op_client)
stats = importer.import_recent_debates(batch_size=5000)
logger.success(f"β
Imported {stats.get('debates', 0)} debates, {stats.get('statements', 0)} statements")
return stats.get('debates', 0)
except Exception as e:
logger.error(f"Error importing debates: {e}")
return 0
def run_all(self) -> Dict[str, Any]:
"""
Run all lightweight updates with parallelization.
Returns statistics dictionary.
"""
start_time = datetime.now(timezone.utc)
logger.info("=" * 60)
logger.info("LIGHTWEIGHT HOURLY UPDATE (OPTIMIZED)")
logger.info(f"Started: {start_time.isoformat()}")
logger.info("=" * 60)
# 1. MP updates (sequential - modify same nodes)
logger.info("\n1. Updating MP data...")
self.stats["mps_updated"] = self.update_mp_parties()
cabinet_count = self.update_cabinet_positions()
# 2. Bills + Votes (parallel - independent operations)
logger.info("\n2. Checking bills and votes (parallel)...")
with ThreadPoolExecutor(max_workers=2) as executor:
bills_future = executor.submit(self.check_new_bills, 24)
votes_future = executor.submit(self.check_recent_votes, 24)
# Get results with error handling
try:
self.stats["new_bills"] = bills_future.result()
except Exception as e:
logger.error(f"Error checking bills: {e}")
self.stats["new_bills"] = 0
try:
self.stats["new_votes"] = votes_future.result()
except Exception as e:
logger.error(f"Error checking votes: {e}")
self.stats["new_votes"] = 0
# 3. Debates (only if House is sitting)
logger.info("\n3. Checking for recent debates...")
lookback_days = self.should_run_debate_import()
if lookback_days > 0:
self.stats["debates"] = self.import_recent_debates(since_hours=lookback_days * 24)
else:
self.stats["debates"] = 0
end_time = datetime.now(timezone.utc)
duration = (end_time - start_time).total_seconds()
logger.info("=" * 60)
logger.success("β
LIGHTWEIGHT UPDATE COMPLETE")
logger.info(f"Duration: {duration:.1f} seconds")
logger.info(f"MPs updated: {self.stats['mps_updated']}")
logger.info(f"Party changes: {len(self.stats['party_changes'])}")
logger.info(f"Cabinet changes: {len(self.stats['cabinet_changes'])}")
logger.info(f"New bills: {self.stats['new_bills']}")
logger.info(f"New votes: {self.stats['new_votes']}")
logger.info(f"Debates imported: {self.stats['debates']}")
# Log any party changes prominently
if self.stats["party_changes"]:
logger.warning("=" * 60)
logger.warning("β οΈ PARTY CHANGES DETECTED:")
for change in self.stats["party_changes"]:
logger.warning(f" β’ {change['mp_name']}: {change['old_party']} β {change['new_party']}")
logger.warning("=" * 60)
# Log any cabinet changes
if self.stats["cabinet_changes"]:
logger.warning("=" * 60)
logger.warning("π CABINET CHANGES DETECTED:")
for change in self.stats["cabinet_changes"]:
if change["type"] == "appointment":
logger.warning(f" β’ NEW: {change['mp_name']} β {change['new_position']}")
elif change["type"] == "exit":
logger.warning(f" β’ EXIT: {change['mp_name']} (was {change['old_position']})")
elif change["type"] == "shuffle":
logger.warning(f" β’ SHUFFLE: {change['mp_name']}: {change['old_position']} β {change['new_position']}")
logger.warning("=" * 60)
logger.info("=" * 60)
return self.stats
def main():
"""Main entry point for Cloud Run job."""
# Load config from environment or .env file
config = Config()
# Connect to Neo4j
neo4j_client = Neo4jClient(
uri=config.neo4j_uri,
user=config.neo4j_user,
password=config.neo4j_password
)
try:
# Run lightweight updates
updater = LightweightUpdater(neo4j_client)
stats = updater.run_all()
# Return success
logger.success("Lightweight update completed successfully")
return 0
except Exception as e:
logger.error(f"Lightweight update failed: {e}")
import traceback
traceback.print_exc()
return 1
finally:
neo4j_client.close()
if __name__ == "__main__":
exit(main())