Skip to main content
Glama
northernvariables

FedMCP - Federal Parliamentary Information

lightweight_update.pyβ€’21.3 kB
#!/usr/bin/env python3 """ Lightweight hourly update job for Cloud Run. OPTIMIZED VERSION with: - Batch MERGE operations instead of loops - Parallel execution for independent operations - Smart scheduling (only run debates when House sits) - Debate ingestion support Updates only critical data that changes frequently: - MP party affiliations (e.g., floor-crossers like Chris d'Entrement) - Cabinet positions - New bills introduced - Recent votes (last 24 hours) - Recent debates (when available) Designed to be fast (<30 seconds) and low-memory (<1GB). """ import os import sys from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Dict, Any, List from concurrent.futures import ThreadPoolExecutor import logging # Add packages to path SCRIPT_DIR = Path(__file__).parent PIPELINE_DIR = SCRIPT_DIR.parent sys.path.insert(0, str(PIPELINE_DIR)) from fedmcp_pipeline.utils.neo4j_client import Neo4jClient from fedmcp_pipeline.utils.config import Config from fedmcp_pipeline.utils.progress import logger # Add fedmcp clients FEDMCP_PATH = PIPELINE_DIR.parent / "fedmcp" / "src" sys.path.insert(0, str(FEDMCP_PATH)) from fedmcp.clients.openparliament import OpenParliamentClient from fedmcp.http import RateLimitedSession class LightweightUpdater: """Fast hourly updates for critical parliamentary data.""" def __init__(self, neo4j_client: Neo4jClient): self.neo4j = neo4j_client # Create session with longer timeout for pagination-heavy operations # 90s timeout allows for fetching multiple pages without timing out session = RateLimitedSession( min_request_interval=0.1, # 10 req/s (respectful rate limiting) default_timeout=90.0, # 90s timeout for long-running queries max_attempts=5, backoff_factor=1.0 ) self.op_client = OpenParliamentClient(session=session) self.stats = { "mps_updated": 0, "party_changes": [], "cabinet_changes": [], "new_bills": 0, "new_votes": 0, "debates": 0, } def update_mp_parties(self) -> int: """ Update MP party affiliations using batch MERGE. Returns count of MPs updated. """ logger.info("Checking for MP party changes...") # Fetch all current MPs from OpenParliament mps_list = list(self.op_client.list_mps()) logger.info(f"Fetched {len(mps_list)} current MPs from OpenParliament") # Get current party affiliations from Neo4j for comparison current_parties_query = """ MATCH (m:MP) RETURN m.id as id, m.name as name, m.party as party """ current_parties = {row["id"]: row for row in self.neo4j.run_query(current_parties_query)} # Collect all MP updates mp_updates = [] for mp_data in mps_list: mp_id = mp_data.get("url", "").split("/")[-2] mp_name = mp_data.get("name") # OpenParliament API returns short_name as either a string or dict {'en': 'Party Name'} party_data = mp_data.get("current_party", {}).get("short_name") new_party = party_data.get("en") if isinstance(party_data, dict) else party_data if not mp_id or not new_party: continue # Check if party changed old_record = current_parties.get(mp_id, {}) old_party = old_record.get("party") if old_party and old_party != new_party: logger.warning(f"πŸ”„ Party change detected: {mp_name} ({old_party} β†’ {new_party})") self.stats["party_changes"].append({ "mp_name": mp_name, "old_party": old_party, "new_party": new_party, "timestamp": datetime.now(timezone.utc).isoformat() }) # Add to batch update mp_updates.append({ "id": mp_id, "name": mp_name, "party": new_party, "updated_at": datetime.now(timezone.utc).isoformat() }) # Batch MERGE operation with conservative batch size to avoid connection resets if mp_updates: self.neo4j.batch_merge_nodes("MP", mp_updates, merge_keys=["id"], batch_size=100) logger.success(f"βœ… Updated {len(mp_updates)} MP records") return len(mp_updates) def update_cabinet_positions(self) -> int: """ Update cabinet positions using batch operations. Returns count of cabinet ministers updated. """ logger.info("Checking for cabinet changes...") # Get current cabinet positions from Neo4j current_cabinet_query = """ MATCH (m:MP) WHERE m.cabinet_position IS NOT NULL RETURN m.id as id, m.name as name, m.cabinet_position as position """ current_cabinet = {row["id"]: row for row in self.neo4j.run_query(current_cabinet_query)} # Fetch MPs with cabinet roles from OpenParliament mps_list = list(self.op_client.list_mps()) # Collect updates cabinet_updates = [] cabinet_removals = [] for mp_data in mps_list: mp_id = mp_data.get("url", "").split("/")[-2] mp_name = mp_data.get("name") current_role = mp_data.get("current_role") if not mp_id: continue # Extract cabinet position if exists new_position = None if current_role and "Minister" in current_role: new_position = current_role old_record = current_cabinet.get(mp_id, {}) old_position = old_record.get("position") # Detect changes if old_position != new_position: if old_position and not new_position: logger.warning(f"πŸ“‰ Cabinet exit: {mp_name} (was {old_position})") self.stats["cabinet_changes"].append({ "mp_name": mp_name, "type": "exit", "old_position": old_position, "timestamp": datetime.now(timezone.utc).isoformat() }) cabinet_removals.append(mp_id) elif not old_position and new_position: logger.warning(f"πŸ“ˆ New cabinet minister: {mp_name} β†’ {new_position}") self.stats["cabinet_changes"].append({ "mp_name": mp_name, "type": "appointment", "new_position": new_position, "timestamp": datetime.now(timezone.utc).isoformat() }) cabinet_updates.append({ "id": mp_id, "cabinet_position": new_position, "updated_at": datetime.now(timezone.utc).isoformat() }) elif old_position and new_position: logger.warning(f"πŸ”„ Cabinet shuffle: {mp_name} ({old_position} β†’ {new_position})") self.stats["cabinet_changes"].append({ "mp_name": mp_name, "type": "shuffle", "old_position": old_position, "new_position": new_position, "timestamp": datetime.now(timezone.utc).isoformat() }) cabinet_updates.append({ "id": mp_id, "cabinet_position": new_position, "updated_at": datetime.now(timezone.utc).isoformat() }) # Batch update cabinet positions if cabinet_updates: self.neo4j.batch_merge_nodes("MP", cabinet_updates, merge_keys=["id"], batch_size=100) # Batch remove cabinet positions if cabinet_removals: remove_query = """ UNWIND $ids AS mp_id MATCH (m:MP {id: mp_id}) REMOVE m.cabinet_position SET m.updated_at = datetime() """ self.neo4j.run_query(remove_query, {"ids": cabinet_removals}) updated_count = len(cabinet_updates) + len(cabinet_removals) logger.success(f"βœ… Updated {updated_count} cabinet positions") return updated_count def check_new_bills(self, since_hours: int = 24) -> int: """ Check for bills introduced in the last N hours. Args: since_hours: Look back this many hours Returns count of new bills. """ logger.info(f"Checking for bills introduced in last {since_hours} hours...") cutoff_date = (datetime.now(timezone.utc) - timedelta(hours=since_hours)).date().isoformat() new_count = 0 # Get latest bills from OpenParliament for bill in self.op_client.list_bills(): introduced_date = bill.get("introduced") if not introduced_date or introduced_date < cutoff_date: continue # Check if bill already exists in Neo4j bill_number = bill.get("number") bill_session = bill.get("session") check_query = """ MATCH (b:Bill {number: $number, session: $session}) RETURN b """ existing = self.neo4j.run_query(check_query, { "number": bill_number, "session": bill_session }) if not existing: # Create new bill node create_query = """ MERGE (b:Bill {number: $number, session: $session}) SET b.name_en = $name_en, b.name_fr = $name_fr, b.introduced = $introduced, b.sponsor_politician_id = $sponsor_id, b.status_code = $status, b.updated_at = datetime() RETURN b """ self.neo4j.run_query(create_query, { "number": bill_number, "session": bill_session, "name_en": bill.get("name", {}).get("en"), "name_fr": bill.get("name", {}).get("fr"), "introduced": introduced_date, "sponsor_id": bill.get("sponsor_politician_url", "").split("/")[-2] if bill.get("sponsor_politician_url") else None, "status": bill.get("status_code") }) new_count += 1 logger.info(f"πŸ“œ New bill: {bill_number} - {bill.get('name', {}).get('en', 'Unknown')}") logger.success(f"βœ… Found {new_count} new bills") return new_count def check_recent_votes(self, since_hours: int = 24) -> int: """ Check for votes in the last N hours. Args: since_hours: Look back this many hours Returns count of new votes. """ logger.info(f"Checking for votes in last {since_hours} hours...") cutoff_date = (datetime.now(timezone.utc) - timedelta(hours=since_hours)).date().isoformat() new_count = 0 for vote in self.op_client.list_votes(): vote_date = vote.get("date") if not vote_date or vote_date < cutoff_date: continue # Parse session to get parliament and session numbers (format: "44-1") session_str = vote.get("session", "") parliament_number = None session_number = None if "-" in session_str: parts = session_str.split("-") if len(parts) == 2: try: parliament_number = int(parts[0]) session_number = int(parts[1]) except ValueError: pass vote_number = vote.get("number") if not vote_number: continue # Check if vote already exists using canonical vote_number field check_query = """ MATCH (v:Vote {vote_number: $vote_number}) RETURN v """ existing = self.neo4j.run_query(check_query, {"vote_number": vote_number}) if not existing: # Create new vote node with canonical property names # Matching schema from votes_xml_import.py create_query = """ MERGE (v:Vote {vote_number: $vote_number}) SET v.parliament_number = $parliament_number, v.session_number = $session_number, v.date_time = datetime($date_time), v.result = $result, v.num_yeas = $num_yeas, v.num_nays = $num_nays, v.num_paired = $num_paired, v.updated_at = datetime() RETURN v """ self.neo4j.run_query(create_query, { "vote_number": vote_number, "parliament_number": parliament_number, "session_number": session_number, "date_time": vote_date, # Will be converted to datetime "result": vote.get("result"), "num_yeas": vote.get("yeas"), "num_nays": vote.get("nays"), "num_paired": vote.get("paired") }) new_count += 1 logger.info(f"πŸ—³οΈ New vote: #{vote_number} - {vote.get('result')}") logger.success(f"βœ… Found {new_count} new votes") return new_count def should_run_debate_import(self) -> int: """ Check if new debates likely available (smart scheduling). Returns: Number of days to look back for imports (0 if should skip) """ from datetime import datetime, timedelta # 1. Check if any debates published in last 48 hours recent_cutoff = (datetime.now(timezone.utc) - timedelta(hours=48)).date() try: # Query OpenParliament API for recent debates debates = list(self.op_client.list_debates(limit=10)) recent_debates = [d for d in debates if d.get("date", "") >= str(recent_cutoff)] if recent_debates: logger.info(f"Found {len(recent_debates)} debates in last 48h - importing") return 2 # Look back 2 days except Exception as e: logger.warning(f"Error checking debate schedule: {e}") # 2. Safety fallback - catch up on missed debates # (Check Neo4j for last successful import timestamp) query = """ MATCH (d:Document) WHERE d.public = true RETURN d.date as last_debate_date ORDER BY d.date DESC LIMIT 1 """ result = self.neo4j.run_query(query) if result and len(result) > 0: last_date_str = result[0]["last_debate_date"] last_date = datetime.fromisoformat(last_date_str).replace(tzinfo=timezone.utc) days_since = (datetime.now(timezone.utc) - last_date).days if days_since > 7: # Look back enough days to catch up + buffer lookback_days = min(days_since + 2, 30) # Cap at 30 days logger.info(f"Last debate {days_since} days ago - running catch-up import ({lookback_days} days)") return lookback_days logger.info("No recent debates - skipping import") return 0 def import_recent_debates(self, since_hours: int = 24) -> int: """ Import debates from last N hours via OpenParliament API. Args: since_hours: Import debates from last N hours Returns count of debates imported. """ logger.info(f"Importing debates from last {since_hours} hours...") try: from fedmcp_pipeline.ingest.recent_import import RecentDataImporter # Calculate cutoff date cutoff = (datetime.now(timezone.utc) - timedelta(hours=since_hours)).isoformat()[:10] # YYYY-MM-DD # Use existing importer (already optimized with MERGE) with our configured 90s timeout client importer = RecentDataImporter(self.neo4j, start_date=cutoff, op_client=self.op_client) stats = importer.import_recent_debates(batch_size=5000) logger.success(f"βœ… Imported {stats.get('debates', 0)} debates, {stats.get('statements', 0)} statements") return stats.get('debates', 0) except Exception as e: logger.error(f"Error importing debates: {e}") return 0 def run_all(self) -> Dict[str, Any]: """ Run all lightweight updates with parallelization. Returns statistics dictionary. """ start_time = datetime.now(timezone.utc) logger.info("=" * 60) logger.info("LIGHTWEIGHT HOURLY UPDATE (OPTIMIZED)") logger.info(f"Started: {start_time.isoformat()}") logger.info("=" * 60) # 1. MP updates (sequential - modify same nodes) logger.info("\n1. Updating MP data...") self.stats["mps_updated"] = self.update_mp_parties() cabinet_count = self.update_cabinet_positions() # 2. Bills + Votes (parallel - independent operations) logger.info("\n2. Checking bills and votes (parallel)...") with ThreadPoolExecutor(max_workers=2) as executor: bills_future = executor.submit(self.check_new_bills, 24) votes_future = executor.submit(self.check_recent_votes, 24) # Get results with error handling try: self.stats["new_bills"] = bills_future.result() except Exception as e: logger.error(f"Error checking bills: {e}") self.stats["new_bills"] = 0 try: self.stats["new_votes"] = votes_future.result() except Exception as e: logger.error(f"Error checking votes: {e}") self.stats["new_votes"] = 0 # 3. Debates (only if House is sitting) logger.info("\n3. Checking for recent debates...") lookback_days = self.should_run_debate_import() if lookback_days > 0: self.stats["debates"] = self.import_recent_debates(since_hours=lookback_days * 24) else: self.stats["debates"] = 0 end_time = datetime.now(timezone.utc) duration = (end_time - start_time).total_seconds() logger.info("=" * 60) logger.success("βœ… LIGHTWEIGHT UPDATE COMPLETE") logger.info(f"Duration: {duration:.1f} seconds") logger.info(f"MPs updated: {self.stats['mps_updated']}") logger.info(f"Party changes: {len(self.stats['party_changes'])}") logger.info(f"Cabinet changes: {len(self.stats['cabinet_changes'])}") logger.info(f"New bills: {self.stats['new_bills']}") logger.info(f"New votes: {self.stats['new_votes']}") logger.info(f"Debates imported: {self.stats['debates']}") # Log any party changes prominently if self.stats["party_changes"]: logger.warning("=" * 60) logger.warning("⚠️ PARTY CHANGES DETECTED:") for change in self.stats["party_changes"]: logger.warning(f" β€’ {change['mp_name']}: {change['old_party']} β†’ {change['new_party']}") logger.warning("=" * 60) # Log any cabinet changes if self.stats["cabinet_changes"]: logger.warning("=" * 60) logger.warning("πŸ“‹ CABINET CHANGES DETECTED:") for change in self.stats["cabinet_changes"]: if change["type"] == "appointment": logger.warning(f" β€’ NEW: {change['mp_name']} β†’ {change['new_position']}") elif change["type"] == "exit": logger.warning(f" β€’ EXIT: {change['mp_name']} (was {change['old_position']})") elif change["type"] == "shuffle": logger.warning(f" β€’ SHUFFLE: {change['mp_name']}: {change['old_position']} β†’ {change['new_position']}") logger.warning("=" * 60) logger.info("=" * 60) return self.stats def main(): """Main entry point for Cloud Run job.""" # Load config from environment or .env file config = Config() # Connect to Neo4j neo4j_client = Neo4jClient( uri=config.neo4j_uri, user=config.neo4j_user, password=config.neo4j_password ) try: # Run lightweight updates updater = LightweightUpdater(neo4j_client) stats = updater.run_all() # Return success logger.success("Lightweight update completed successfully") return 0 except Exception as e: logger.error(f"Lightweight update failed: {e}") import traceback traceback.print_exc() return 1 finally: neo4j_client.close() if __name__ == "__main__": exit(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/northernvariables/FedMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server