#!/usr/bin/env python3
"""
Activity Feed Ingestion Cloud Run Job
This job populates the Supabase activity_items table with parliamentary activity
data from Neo4j for the personalized activity feed feature.
Features:
- Ingests vote activities (MP voted on bill/motion)
- Ingests bill update activities (bill status changes)
- Ingests committee meeting activities
- Deduplicates based on existing records
- Configurable lookback period
Environment variables required:
- NEO4J_URI: Neo4j connection URI (default: bolt://10.128.0.3:7687)
- NEO4J_USERNAME: Neo4j username (default: neo4j)
- NEO4J_PASSWORD: Neo4j password
- SUPABASE_URL: Supabase project URL
- SUPABASE_SERVICE_ROLE_KEY: Supabase service role key
- ACTIVITY_LOOKBACK_DAYS: How far back to look for activity (default: 7)
"""
import sys
import os
from fedmcp_pipeline.utils.neo4j_client import Neo4jClient
from fedmcp_pipeline.utils.progress import logger
from fedmcp_pipeline.ingest.activity_feed import ActivityFeedIngester
from fedmcp_pipeline.ingest.opposition_leader import ingest_opposition_leader
from fedmcp_pipeline.ingest.political_entities import ingest_parties, ingest_ridings
from fedmcp_pipeline.relationships.political import build_political_structure
def main():
"""Run activity feed ingestion job."""
logger.info("=" * 80)
logger.info("ACTIVITY FEED INGESTION CLOUD RUN JOB - STARTING")
logger.info("=" * 80)
print()
# Get environment variables
neo4j_uri = os.getenv('NEO4J_URI', 'bolt://10.128.0.3:7687')
neo4j_user = os.getenv('NEO4J_USERNAME', 'neo4j')
neo4j_password = os.getenv('NEO4J_PASSWORD')
supabase_url = os.getenv('SUPABASE_URL')
supabase_key = os.getenv('SUPABASE_SERVICE_ROLE_KEY')
lookback_days = os.getenv('ACTIVITY_LOOKBACK_DAYS', '7')
# Validate required environment variables
missing = []
if not neo4j_password:
missing.append("NEO4J_PASSWORD")
if not supabase_url:
missing.append("SUPABASE_URL")
if not supabase_key:
missing.append("SUPABASE_SERVICE_ROLE_KEY")
if missing:
logger.error(f"Missing required environment variables: {', '.join(missing)}")
sys.exit(1)
# Parse lookback days
try:
lookback = int(lookback_days)
except ValueError:
logger.warning(f"Invalid ACTIVITY_LOOKBACK_DAYS value: {lookback_days}, using 7")
lookback = 7
logger.info(f"Configuration:")
logger.info(f" Neo4j URI: {neo4j_uri}")
logger.info(f" Supabase URL: {supabase_url[:50]}...")
logger.info(f" Lookback days: {lookback}")
print()
logger.info(f"Connecting to Neo4j at {neo4j_uri}...")
neo4j = Neo4jClient(uri=neo4j_uri, user=neo4j_user, password=neo4j_password)
try:
logger.info("Running activity feed ingestion...")
logger.info("This will:")
logger.info(" - Query Neo4j for recent votes, bill updates, committee meetings")
logger.info(" - Transform data into activity_items format")
logger.info(" - Insert into Supabase (skipping duplicates)")
print()
# Create ingester and run
ingester = ActivityFeedIngester(
neo4j_client=neo4j,
supabase_url=supabase_url,
supabase_key=supabase_key,
)
# Ensure Party and Riding nodes exist (extracted from MP data)
# These are required for MEMBER_OF and REPRESENTS relationships
logger.info("Ensuring Party and Riding nodes exist...")
party_count = ingest_parties(neo4j)
logger.info(f"Parties: {party_count} created/updated")
riding_count = ingest_ridings(neo4j)
logger.info(f"Ridings: {riding_count} created/updated")
print()
# Build political structure relationships (MEMBER_OF, REPRESENTS)
# This is idempotent and ensures MPs are linked to Parties and Ridings
# which is required for the suggested MPs query to work
logger.info("Building political structure relationships...")
pol_stats = build_political_structure(neo4j, batch_size=5000)
logger.info(f"Political relationships: {pol_stats.get('member_of', 0)} MEMBER_OF, {pol_stats.get('represents', 0)} REPRESENTS")
print()
# Ensure Opposition Leader role exists (fetched from House Officers data)
# This is needed for the suggested MPs query to find party leaders
logger.info("Ensuring Opposition Leader role exists...")
opp_count = ingest_opposition_leader(neo4j)
logger.info(f"Opposition Leader roles: {opp_count} created/updated")
print()
# Sync suggested MPs for default feed content
logger.info("Syncing suggested MPs for default feed content...")
suggested_stats = ingester.sync_suggested_mps()
logger.info(f"Suggested MPs synced: {suggested_stats['leaders']} leaders, {suggested_stats['random']} random")
print()
# Ingest activity data
stats = ingester.ingest_all(
lookback_days=lookback,
skip_existing=True,
)
print()
logger.info("=" * 80)
logger.info("ACTIVITY FEED INGESTION CLOUD RUN JOB - COMPLETED")
logger.info("=" * 80)
print()
# Exit with error code if there were significant errors
total_errors = sum(s.get("errors", 0) for s in stats.values())
total_inserted = sum(s.get("inserted", 0) for s in stats.values())
if total_errors > 0 and total_inserted == 0:
logger.error("All inserts failed!")
sys.exit(1)
except Exception as e:
logger.error(f"Activity feed ingestion job failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
neo4j.close()
if __name__ == "__main__":
main()