#!/usr/bin/env python3
"""
Lightweight hourly update job for Cloud Run.
OPTIMIZED VERSION with:
- Batch MERGE operations instead of loops
- Parallel execution for independent operations
- Smart scheduling (only run debates when House sits)
- Debate ingestion support
- Bill amendment detection
Updates only critical data that changes frequently:
- MP party affiliations (e.g., floor-crossers like Chris d'Entrement)
- Cabinet positions
- New bills introduced
- Recent votes (last 24 hours)
- Recent debates (when available)
- Bill amendments (compares versions, stores text history on sections)
Designed to be fast (<30 seconds) and low-memory (<1GB).
"""
import os
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Dict, Any, List
from concurrent.futures import ThreadPoolExecutor
import logging
# Add packages to path
SCRIPT_DIR = Path(__file__).parent
PIPELINE_DIR = SCRIPT_DIR.parent
sys.path.insert(0, str(PIPELINE_DIR))
from fedmcp_pipeline.utils.neo4j_client import Neo4jClient
from fedmcp_pipeline.utils.config import Config
from fedmcp_pipeline.utils.progress import logger
# Add fedmcp clients
FEDMCP_PATH = PIPELINE_DIR.parent / "fedmcp" / "src"
sys.path.insert(0, str(FEDMCP_PATH))
from fedmcp.clients.openparliament import OpenParliamentClient
from fedmcp.http import RateLimitedSession
# Import amendment detection
from fedmcp_pipeline.ingest.bill_amendments import detect_bill_amendments
class LightweightUpdater:
"""Fast hourly updates for critical parliamentary data."""
def __init__(self, neo4j_client: Neo4jClient):
self.neo4j = neo4j_client
# Create session with longer timeout for pagination-heavy operations
# 90s timeout allows for fetching multiple pages without timing out
session = RateLimitedSession(
min_request_interval=0.1, # 10 req/s (respectful rate limiting)
default_timeout=90.0, # 90s timeout for long-running queries
max_attempts=5,
backoff_factor=1.0
)
self.op_client = OpenParliamentClient(session=session)
self.stats = {
"mps_updated": 0,
"party_changes": [],
"cabinet_changes": [],
"bills_updated": 0,
"new_bills": [], # List of newly detected bills
"text_backfilled": 0, # Bills that got text backfilled
"amendments_detected": 0, # Bills with new amendments detected
"new_votes": 0,
"debates": 0,
}
def update_mp_parties(self) -> int:
"""
Update MP party affiliations using batch MERGE.
Returns count of MPs updated.
"""
logger.info("Checking for MP party changes...")
# Fetch all current MPs from OpenParliament
mps_list = list(self.op_client.list_mps())
logger.info(f"Fetched {len(mps_list)} current MPs from OpenParliament")
# Get current party affiliations from Neo4j for comparison
current_parties_query = """
MATCH (m:MP)
RETURN m.id as id, m.name as name, m.party as party
"""
current_parties = {row["id"]: row for row in self.neo4j.run_query(current_parties_query)}
# Collect all MP updates
mp_updates = []
for mp_data in mps_list:
mp_id = mp_data.get("url", "").split("/")[-2]
mp_name = mp_data.get("name")
# OpenParliament API returns short_name as either a string or dict {'en': 'Party Name'}
party_data = mp_data.get("current_party", {}).get("short_name")
new_party = party_data.get("en") if isinstance(party_data, dict) else party_data
if not mp_id or not new_party:
continue
# Check if party changed
old_record = current_parties.get(mp_id, {})
old_party = old_record.get("party")
if old_party and old_party != new_party:
logger.warning(f"🔄 Party change detected: {mp_name} ({old_party} → {new_party})")
self.stats["party_changes"].append({
"mp_name": mp_name,
"old_party": old_party,
"new_party": new_party,
"timestamp": datetime.now(timezone.utc).isoformat()
})
# Add to batch update
mp_updates.append({
"id": mp_id,
"name": mp_name,
"party": new_party,
"updated_at": datetime.now(timezone.utc).isoformat()
})
# Batch MERGE operation with conservative batch size to avoid connection resets
if mp_updates:
self.neo4j.batch_merge_nodes("MP", mp_updates, merge_keys=["id"], batch_size=100)
logger.success(f"✅ Updated {len(mp_updates)} MP records")
return len(mp_updates)
def update_cabinet_positions(self) -> int:
"""
Update cabinet positions using batch operations.
Returns count of cabinet ministers updated.
"""
logger.info("Checking for cabinet changes...")
# Get current cabinet positions from Neo4j
current_cabinet_query = """
MATCH (m:MP)
WHERE m.cabinet_position IS NOT NULL
RETURN m.id as id, m.name as name, m.cabinet_position as position
"""
current_cabinet = {row["id"]: row for row in self.neo4j.run_query(current_cabinet_query)}
# Fetch MPs with cabinet roles from OpenParliament
mps_list = list(self.op_client.list_mps())
# Collect updates
cabinet_updates = []
cabinet_removals = []
for mp_data in mps_list:
mp_id = mp_data.get("url", "").split("/")[-2]
mp_name = mp_data.get("name")
current_role = mp_data.get("current_role")
if not mp_id:
continue
# Extract cabinet position if exists
new_position = None
if current_role and "Minister" in current_role:
new_position = current_role
old_record = current_cabinet.get(mp_id, {})
old_position = old_record.get("position")
# Detect changes
if old_position != new_position:
if old_position and not new_position:
logger.warning(f"📉 Cabinet exit: {mp_name} (was {old_position})")
self.stats["cabinet_changes"].append({
"mp_name": mp_name,
"type": "exit",
"old_position": old_position,
"timestamp": datetime.now(timezone.utc).isoformat()
})
cabinet_removals.append(mp_id)
elif not old_position and new_position:
logger.warning(f"📈 New cabinet minister: {mp_name} → {new_position}")
self.stats["cabinet_changes"].append({
"mp_name": mp_name,
"type": "appointment",
"new_position": new_position,
"timestamp": datetime.now(timezone.utc).isoformat()
})
cabinet_updates.append({
"id": mp_id,
"cabinet_position": new_position,
"updated_at": datetime.now(timezone.utc).isoformat()
})
elif old_position and new_position:
logger.warning(f"🔄 Cabinet shuffle: {mp_name} ({old_position} → {new_position})")
self.stats["cabinet_changes"].append({
"mp_name": mp_name,
"type": "shuffle",
"old_position": old_position,
"new_position": new_position,
"timestamp": datetime.now(timezone.utc).isoformat()
})
cabinet_updates.append({
"id": mp_id,
"cabinet_position": new_position,
"updated_at": datetime.now(timezone.utc).isoformat()
})
# Batch update cabinet positions
if cabinet_updates:
self.neo4j.batch_merge_nodes("MP", cabinet_updates, merge_keys=["id"], batch_size=100)
# Batch remove cabinet positions
if cabinet_removals:
remove_query = """
UNWIND $ids AS mp_id
MATCH (m:MP {id: mp_id})
REMOVE m.cabinet_position
SET m.updated_at = datetime()
"""
self.neo4j.run_query(remove_query, {"ids": cabinet_removals})
updated_count = len(cabinet_updates) + len(cabinet_removals)
logger.success(f"✅ Updated {updated_count} cabinet positions")
return updated_count
def update_bills_from_legisinfo(self) -> dict:
"""
Update all bills from LEGISinfo JSON bulk export.
This is fast (~2 seconds) and updates:
- New bills introduced
- Status changes (House → Committee → Senate → Royal Assent)
- Stage progression dates
- Bill type, originating chamber, sponsor info
For NEW bills, automatically triggers bill text ingestion.
Also retries text ingestion for existing bills missing text.
Returns dict with count, new_bills list, and text_backfilled count.
"""
logger.info("Updating bills from LEGISinfo...")
try:
from fedmcp_pipeline.ingest.parliament import ingest_bills_from_legisinfo_json
result = ingest_bills_from_legisinfo_json(self.neo4j, batch_size=10000)
count = result.get("count", 0)
new_bills = result.get("new_bills", [])
logger.success(f"✅ Updated {count} bills from LEGISinfo")
# Ingest text for new bills
if new_bills:
self._ingest_text_for_bills(new_bills, label="new")
# Also try to backfill text for existing bills missing it
bills_missing_text = self._get_bills_missing_text()
if bills_missing_text:
backfilled = self._ingest_text_for_bills(bills_missing_text, label="backfill")
result["text_backfilled"] = backfilled
else:
result["text_backfilled"] = 0
return result
except Exception as e:
logger.error(f"Error updating bills: {e}")
return {"count": 0, "new_bills": [], "text_backfilled": 0}
def _get_bills_missing_text(self) -> list:
"""
Get bills from current session that don't have full text yet.
Only backfills current session to keep hourly updates fast.
Historical sessions can be backfilled separately if needed.
Returns list of bill dicts with number, session, parliament info.
"""
# Get current session
current_session_result = self.neo4j.run_query("""
MATCH (b:Bill)
RETURN b.session as session
ORDER BY b.session DESC
LIMIT 1
""")
current_session = current_session_result[0]["session"] if current_session_result else "45-1"
result = self.neo4j.run_query("""
MATCH (b:Bill)
WHERE b.session = $current_session
AND (b.full_text_en IS NULL OR b.full_text_en = '')
RETURN b.number as number,
b.session as session,
b.is_government_bill as is_government_bill
ORDER BY b.number
""", {"current_session": current_session})
bills = []
for row in result:
session = row["session"]
# Parse parliament/session from session string like "45-1"
try:
parts = session.split("-")
parliament = int(parts[0])
session_num = int(parts[1])
except (ValueError, IndexError):
continue
bills.append({
"number": row["number"],
"session": session,
"parliament": parliament,
"session_number": session_num,
"is_government_bill": row.get("is_government_bill", False),
})
return bills
def _ingest_text_for_bills(self, bills: list, label: str = "new") -> int:
"""
Ingest bill text/structure for a list of bills.
Args:
bills: List of bill dicts with number, parliament, session_number, is_government_bill
label: Label for logging ("new" for new bills, "backfill" for missing text)
Returns count of bills with text successfully ingested.
"""
logger.info(f"📄 Ingesting text for {len(bills)} {label} bill(s)...")
try:
from fedmcp_pipeline.ingest.bill_structure import ingest_bill_structure
except ImportError as e:
logger.error(f"Failed to import bill_structure module: {e}")
return 0
ingested = 0
for bill in bills:
bill_number = bill.get("number")
parliament = bill.get("parliament")
session_num = bill.get("session_number")
is_gov = bill.get("is_government_bill", False)
if not all([bill_number, parliament, session_num]):
logger.warning(f" Skipping {bill_number}: missing parliament/session info")
continue
try:
logger.info(f" [{label}] Ingesting text for {bill_number}...")
result = ingest_bill_structure(
self.neo4j,
parliament=parliament,
session=session_num,
bill_number=bill_number,
version=1,
is_government=is_gov,
include_all_versions=False,
include_full_text=True,
)
if result.get("error"):
# Don't log warnings for backfill - just skip silently (will retry next hour)
if label == "new":
logger.warning(f" {bill_number}: {result['error']}")
else:
sections = result.get("sections", 0)
parts = result.get("parts", 0)
logger.info(f" ✅ {bill_number}: {parts} parts, {sections} sections")
ingested += 1
except Exception as e:
# Don't log warnings for backfill failures - will retry next hour
if label == "new":
logger.warning(f" {bill_number}: Failed - {e}")
logger.success(f"✅ Ingested text for {ingested}/{len(bills)} {label} bills")
return ingested
def detect_amendments_for_bills(self, bills: list) -> int:
"""
Detect amendments for bills that may have new versions.
This compares different versions of each bill (first reading, committee,
third reading, royal assent) and stores text history on sections that changed.
Args:
bills: List of bill dicts with number, parliament, session_number, is_government_bill, status
Returns:
Count of bills with amendments detected.
"""
if not bills:
return 0
logger.info(f"🔍 Checking {len(bills)} bill(s) for amendments...")
amendments_found = 0
for bill in bills:
bill_number = bill.get("number")
parliament = bill.get("parliament")
session_num = bill.get("session_number")
is_gov = bill.get("is_government_bill", False)
status = bill.get("status", "")
if not all([bill_number, parliament, session_num]):
continue
# Check if this is a Royal Assent bill (final check)
is_royal_assent = "Royal Assent" in (status or "")
try:
result = detect_bill_amendments(
self.neo4j,
parliament=parliament,
session=session_num,
bill_number=bill_number,
is_government=is_gov,
is_royal_assent=is_royal_assent,
)
if result.get("has_amendments"):
amendments_found += 1
total_diffs = result.get("total_diffs", 0)
logger.info(f" 📝 {bill_number}: {total_diffs} amendment(s) detected")
except Exception as e:
logger.debug(f" {bill_number}: Could not check amendments - {e}")
logger.success(f"✅ Found amendments in {amendments_found}/{len(bills)} bills")
return amendments_found
def _get_bills_for_amendment_check(self) -> list:
"""
Get bills that should be checked for amendments.
Returns bills that either:
1. Have never been checked (has_amendments IS NULL) AND have passed second reading
2. Have been checked but haven't reached Royal Assent yet (could have new versions)
- Only re-check if last check was >24 hours ago to avoid hammering the API
3. Just reached Royal Assent but haven't had final check (checked_at_royal_assent IS NULL)
- Bills at Royal Assent need ONE final check to capture any last amendments
- After final check, they're locked forever
Bills at Royal Assent WITH checked_at_royal_assent=true are never re-checked.
"""
# Get current session from most recent bill
current_session_result = self.neo4j.run_query("""
MATCH (b:Bill)
RETURN b.session as session
ORDER BY b.session DESC
LIMIT 1
""")
current_session = current_session_result[0]["session"] if current_session_result else "45-1"
result = self.neo4j.run_query("""
MATCH (b:Bill)
WHERE b.session = $current_session
AND (
// Case 1: Never checked, past second reading (House OR Senate)
// For House bills (C-*): check passed_house_second_reading
// For Senate bills (S-*): check passed_senate_second_reading
(b.has_amendments IS NULL
AND (b.passed_house_second_reading IS NOT NULL
OR b.passed_senate_second_reading IS NOT NULL
OR b.status CONTAINS 'Committee'
OR b.status CONTAINS 'Third'
OR b.status CONTAINS 'Royal Assent'))
OR
// Case 2: Already checked but not at Royal Assent (could get new amendments)
// Only re-check if last check was >24 hours ago
(b.has_amendments IS NOT NULL
AND NOT (b.status CONTAINS 'Royal Assent')
AND (b.last_amendment_check IS NULL
OR b.last_amendment_check < datetime() - duration('P1D')))
OR
// Case 3: Just reached Royal Assent - need final check
// Bills at Royal Assent need ONE final check to capture any last-minute amendments
(b.status CONTAINS 'Royal Assent'
AND (b.checked_at_royal_assent IS NULL OR b.checked_at_royal_assent = false))
)
RETURN b.number as number,
b.session as session,
b.is_government_bill as is_government_bill,
b.has_amendments as has_amendments,
b.status as status
ORDER BY
// Prioritize: 1) never-checked, 2) Royal Assent needing final check, 3) re-checks
CASE
WHEN b.has_amendments IS NULL THEN 0
WHEN b.status CONTAINS 'Royal Assent' THEN 1
ELSE 2
END,
b.number
""", {"current_session": current_session})
bills = []
for row in result:
session = row["session"]
try:
parts = session.split("-")
parliament = int(parts[0])
session_num = int(parts[1])
except (ValueError, IndexError):
continue
bills.append({
"number": row["number"],
"session": session,
"parliament": parliament,
"session_number": session_num,
"is_government_bill": row.get("is_government_bill", False),
"status": row.get("status", ""),
})
return bills
def check_recent_votes(self, since_hours: int = 24) -> int:
"""
Check for votes in the last N hours.
Args:
since_hours: Look back this many hours
Returns count of new votes.
"""
logger.info(f"Checking for votes in last {since_hours} hours...")
cutoff_date = (datetime.now(timezone.utc) - timedelta(hours=since_hours)).date().isoformat()
new_count = 0
for vote in self.op_client.list_votes():
vote_date = vote.get("date")
if not vote_date or vote_date < cutoff_date:
continue
# Parse session to get parliament and session numbers (format: "44-1")
session_str = vote.get("session", "")
parliament_number = None
session_number = None
if "-" in session_str:
parts = session_str.split("-")
if len(parts) == 2:
try:
parliament_number = int(parts[0])
session_number = int(parts[1])
except ValueError:
pass
vote_number = vote.get("number")
if not vote_number:
continue
# Check if vote already exists using canonical vote_number field
check_query = """
MATCH (v:Vote {vote_number: $vote_number})
RETURN v
"""
existing = self.neo4j.run_query(check_query, {"vote_number": vote_number})
if not existing:
# Create new vote node with canonical property names
# Matching schema from votes_xml_import.py
create_query = """
MERGE (v:Vote {vote_number: $vote_number})
SET v.parliament_number = $parliament_number,
v.session_number = $session_number,
v.date_time = datetime($date_time),
v.result = $result,
v.num_yeas = $num_yeas,
v.num_nays = $num_nays,
v.num_paired = $num_paired,
v.updated_at = datetime()
RETURN v
"""
self.neo4j.run_query(create_query, {
"vote_number": vote_number,
"parliament_number": parliament_number,
"session_number": session_number,
"date_time": vote_date, # Will be converted to datetime
"result": vote.get("result"),
"num_yeas": vote.get("yeas"),
"num_nays": vote.get("nays"),
"num_paired": vote.get("paired")
})
new_count += 1
logger.info(f"🗳️ New vote: #{vote_number} - {vote.get('result')}")
logger.success(f"✅ Found {new_count} new votes")
return new_count
def should_run_debate_import(self) -> int:
"""
Check if new debates likely available (smart scheduling).
Returns:
Number of days to look back for imports (0 if should skip)
"""
from datetime import datetime, timedelta
# 1. Check if any debates published in last 48 hours
recent_cutoff = (datetime.now(timezone.utc) - timedelta(hours=48)).date()
try:
# Query OpenParliament API for recent debates
debates = list(self.op_client.list_debates(limit=10))
recent_debates = [d for d in debates if d.get("date", "") >= str(recent_cutoff)]
if recent_debates:
logger.info(f"Found {len(recent_debates)} debates in last 48h - importing")
return 2 # Look back 2 days
except Exception as e:
logger.warning(f"Error checking debate schedule: {e}")
# 2. Safety fallback - catch up on missed debates
# (Check Neo4j for last successful import timestamp)
query = """
MATCH (d:Document)
WHERE d.public = true
RETURN d.date as last_debate_date
ORDER BY d.date DESC
LIMIT 1
"""
result = self.neo4j.run_query(query)
if result and len(result) > 0:
last_date_str = result[0]["last_debate_date"]
last_date = datetime.fromisoformat(last_date_str).replace(tzinfo=timezone.utc)
days_since = (datetime.now(timezone.utc) - last_date).days
if days_since > 7:
# Look back enough days to catch up + buffer
lookback_days = min(days_since + 2, 30) # Cap at 30 days
logger.info(f"Last debate {days_since} days ago - running catch-up import ({lookback_days} days)")
return lookback_days
logger.info("No recent debates - skipping import")
return 0
def import_recent_debates(self, since_hours: int = 24) -> int:
"""
Import debates from last N hours via OpenParliament API.
Args:
since_hours: Import debates from last N hours
Returns count of debates imported.
"""
logger.info(f"Importing debates from last {since_hours} hours...")
try:
from fedmcp_pipeline.ingest.recent_import import RecentDataImporter
# Calculate cutoff date
cutoff = (datetime.now(timezone.utc) - timedelta(hours=since_hours)).isoformat()[:10] # YYYY-MM-DD
# Use existing importer (already optimized with MERGE) with our configured 90s timeout client
importer = RecentDataImporter(self.neo4j, start_date=cutoff, op_client=self.op_client)
stats = importer.import_recent_debates(batch_size=5000)
logger.success(f"✅ Imported {stats.get('debates', 0)} debates, {stats.get('statements', 0)} statements")
return stats.get('debates', 0)
except Exception as e:
logger.error(f"Error importing debates: {e}")
return 0
def run_all(self) -> Dict[str, Any]:
"""
Run all lightweight updates with parallelization.
Returns statistics dictionary.
"""
start_time = datetime.now(timezone.utc)
logger.info("=" * 60)
logger.info("LIGHTWEIGHT HOURLY UPDATE (OPTIMIZED)")
logger.info(f"Started: {start_time.isoformat()}")
logger.info("=" * 60)
# 1. MP updates (sequential - modify same nodes)
logger.info("\n1. Updating MP data...")
self.stats["mps_updated"] = self.update_mp_parties()
cabinet_count = self.update_cabinet_positions()
# 2. Bills + Votes (parallel - independent operations)
logger.info("\n2. Updating bills and checking votes (parallel)...")
with ThreadPoolExecutor(max_workers=2) as executor:
bills_future = executor.submit(self.update_bills_from_legisinfo)
votes_future = executor.submit(self.check_recent_votes, 24)
# Get results with error handling
try:
bills_result = bills_future.result()
self.stats["bills_updated"] = bills_result.get("count", 0)
self.stats["new_bills"] = bills_result.get("new_bills", [])
self.stats["text_backfilled"] = bills_result.get("text_backfilled", 0)
except Exception as e:
logger.error(f"Error updating bills: {e}")
self.stats["bills_updated"] = 0
self.stats["new_bills"] = []
self.stats["text_backfilled"] = 0
try:
self.stats["new_votes"] = votes_future.result()
except Exception as e:
logger.error(f"Error checking votes: {e}")
self.stats["new_votes"] = 0
# 3. Debates (only if House is sitting)
logger.info("\n3. Checking for recent debates...")
lookback_days = self.should_run_debate_import()
if lookback_days > 0:
self.stats["debates"] = self.import_recent_debates(since_hours=lookback_days * 24)
else:
self.stats["debates"] = 0
# 4. Check for bill amendments (bills not yet checked)
logger.info("\n4. Checking for bill amendments...")
bills_to_check = self._get_bills_for_amendment_check()
if bills_to_check:
self.stats["amendments_detected"] = self.detect_amendments_for_bills(bills_to_check)
else:
logger.info("No bills pending amendment check")
self.stats["amendments_detected"] = 0
end_time = datetime.now(timezone.utc)
duration = (end_time - start_time).total_seconds()
logger.info("=" * 60)
logger.success("✅ LIGHTWEIGHT UPDATE COMPLETE")
logger.info(f"Duration: {duration:.1f} seconds")
logger.info(f"MPs updated: {self.stats['mps_updated']}")
logger.info(f"Party changes: {len(self.stats['party_changes'])}")
logger.info(f"Cabinet changes: {len(self.stats['cabinet_changes'])}")
logger.info(f"Bills updated: {self.stats['bills_updated']}")
logger.info(f"New bills detected: {len(self.stats['new_bills'])}")
logger.info(f"Text backfilled: {self.stats['text_backfilled']}")
logger.info(f"Bills with amendments: {self.stats['amendments_detected']}")
logger.info(f"New votes: {self.stats['new_votes']}")
logger.info(f"Debates imported: {self.stats['debates']}")
# Log any party changes prominently
if self.stats["party_changes"]:
logger.warning("=" * 60)
logger.warning("⚠️ PARTY CHANGES DETECTED:")
for change in self.stats["party_changes"]:
logger.warning(f" • {change['mp_name']}: {change['old_party']} → {change['new_party']}")
logger.warning("=" * 60)
# Log any cabinet changes
if self.stats["cabinet_changes"]:
logger.warning("=" * 60)
logger.warning("📋 CABINET CHANGES DETECTED:")
for change in self.stats["cabinet_changes"]:
if change["type"] == "appointment":
logger.warning(f" • NEW: {change['mp_name']} → {change['new_position']}")
elif change["type"] == "exit":
logger.warning(f" • EXIT: {change['mp_name']} (was {change['old_position']})")
elif change["type"] == "shuffle":
logger.warning(f" • SHUFFLE: {change['mp_name']}: {change['old_position']} → {change['new_position']}")
logger.warning("=" * 60)
# Log new bills
if self.stats["new_bills"]:
logger.info("=" * 60)
logger.info("NEW BILLS DETECTED (text ingested):")
for bill in self.stats["new_bills"]:
logger.info(f" • {bill['number']} ({bill['session']})")
logger.info("=" * 60)
logger.info("=" * 60)
return self.stats
def main():
"""Main entry point for Cloud Run job."""
# Load config from environment or .env file
config = Config()
# Connect to Neo4j
neo4j_client = Neo4jClient(
uri=config.neo4j_uri,
user=config.neo4j_user,
password=config.neo4j_password
)
try:
# Run lightweight updates
updater = LightweightUpdater(neo4j_client)
stats = updater.run_all()
# Return success
logger.success("Lightweight update completed successfully")
return 0
except Exception as e:
logger.error(f"Lightweight update failed: {e}")
import traceback
traceback.print_exc()
return 1
finally:
neo4j_client.close()
if __name__ == "__main__":
exit(main())