"""
Activity Feed Ingestion Module.
Populates the Supabase activity_items table with parliamentary activity data
from Neo4j for the personalized activity feed feature.
Activity Types:
- vote: MP voted on a bill/motion
- bill_update: Bill status changed (reading passed, etc.)
- committee_meeting: Committee meeting held
Entity Types (must match bookmark item_type for feed filtering):
- mp: MP activities (entity_id = MP slug like "pierre-poilievre")
- bill: Bill activities (entity_id = "{session}-{number}" like "45-1-C-11")
- committee: Committee activities (entity_id = committee code like "HUMA")
"""
import os
import json
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
from uuid import uuid4
from supabase import create_client, Client
from ..utils.neo4j_client import Neo4jClient
from ..utils.progress import logger
# ============================================
# Global Event Detection
# ============================================
# Events that should be shown to all users, not just those with bookmarks
GLOBAL_BILL_EVENTS = [
# Major milestones
'royal assent',
'receives royal assent',
'received royal assent',
'sanction royale',
# Third reading
'third reading',
'passed at third reading',
'third reading passed',
'troisième lecture',
# Senate passage
'passed in senate',
'sent to senate',
'passed by senate',
# Second reading (significant)
'second reading passed',
'passed at second reading',
'deuxième lecture adoptée',
]
def is_global_bill_event(latest_event: str) -> bool:
"""Check if a bill event should be shown to all users."""
if not latest_event:
return False
event_lower = latest_event.lower()
return any(global_event in event_lower for global_event in GLOBAL_BILL_EVENTS)
class ActivityFeedIngester:
"""
Ingests parliamentary activity data from Neo4j into Supabase activity_items table.
This ingester:
1. Queries Neo4j for recent votes, bill updates, and committee meetings
2. Transforms them into activity_items format
3. Inserts into Supabase (skipping duplicates based on unique keys)
"""
def __init__(
self,
neo4j_client: Neo4jClient,
supabase_url: Optional[str] = None,
supabase_key: Optional[str] = None,
):
"""
Initialize Activity Feed Ingester.
Args:
neo4j_client: Neo4j client instance
supabase_url: Supabase project URL (or from env)
supabase_key: Supabase service role key (or from env)
"""
self.neo4j = neo4j_client
# Get Supabase credentials
self.supabase_url = supabase_url or os.getenv('SUPABASE_URL')
self.supabase_key = supabase_key or os.getenv('SUPABASE_SERVICE_ROLE_KEY')
if not self.supabase_url or not self.supabase_key:
raise ValueError(
"Supabase credentials required. Set SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY "
"environment variables or pass them to the constructor."
)
self.supabase: Client = create_client(self.supabase_url, self.supabase_key)
def ingest_all(
self,
lookback_days: int = 7,
skip_existing: bool = True,
) -> Dict[str, int]:
"""
Ingest all activity types from Neo4j into Supabase.
Args:
lookback_days: How far back to look for activity (default: 7 days)
skip_existing: Skip activities that already exist (default: True)
Returns:
Dict with import statistics for each activity type
"""
logger.info(f"Ingesting activity feed data (lookback: {lookback_days} days)...")
stats = {
"votes": {"inserted": 0, "skipped": 0, "errors": 0},
"bill_updates": {"inserted": 0, "skipped": 0, "errors": 0},
"committee_meetings": {"inserted": 0, "skipped": 0, "errors": 0},
}
cutoff_date = datetime.now() - timedelta(days=lookback_days)
# Get existing activity keys if skipping
existing_keys = set()
if skip_existing:
existing_keys = self._get_existing_activity_keys(cutoff_date)
logger.info(f"Found {len(existing_keys)} existing activity items in date range")
# Ingest each activity type
vote_stats = self._ingest_votes(cutoff_date, existing_keys)
stats["votes"] = vote_stats
bill_stats = self._ingest_bill_updates(cutoff_date, existing_keys)
stats["bill_updates"] = bill_stats
committee_stats = self._ingest_committee_meetings(cutoff_date, existing_keys)
stats["committee_meetings"] = committee_stats
# Summary
total_inserted = sum(s["inserted"] for s in stats.values())
total_skipped = sum(s["skipped"] for s in stats.values())
total_errors = sum(s["errors"] for s in stats.values())
logger.success(f"✅ Activity feed ingestion complete:")
logger.info(f" Votes: {stats['votes']['inserted']} inserted, {stats['votes']['skipped']} skipped")
logger.info(f" Bill updates: {stats['bill_updates']['inserted']} inserted, {stats['bill_updates']['skipped']} skipped")
logger.info(f" Committee meetings: {stats['committee_meetings']['inserted']} inserted, {stats['committee_meetings']['skipped']} skipped")
logger.info(f" Total: {total_inserted} inserted, {total_skipped} skipped, {total_errors} errors")
return stats
def _get_existing_activity_keys(self, cutoff_date: datetime) -> set:
"""Get set of existing activity unique keys to avoid duplicates."""
try:
result = self.supabase.table("activity_items").select(
"activity_type, entity_type, entity_id, occurred_at"
).gte(
"occurred_at", cutoff_date.isoformat()
).execute()
# Create unique keys for each existing activity
keys = set()
for item in result.data:
key = f"{item['activity_type']}:{item['entity_type']}:{item['entity_id']}:{item['occurred_at']}"
keys.add(key)
return keys
except Exception as e:
logger.warning(f"Error fetching existing activities: {e}")
return set()
def _make_activity_key(self, activity_type: str, entity_type: str, entity_id: str, occurred_at: str) -> str:
"""Create a unique key for an activity item."""
return f"{activity_type}:{entity_type}:{entity_id}:{occurred_at}"
def _ingest_votes(self, cutoff_date: datetime, existing_keys: set) -> Dict[str, int]:
"""
Ingest vote activities.
Creates one activity per MP per vote (so users can see their bookmarked MPs' votes).
"""
stats = {"inserted": 0, "skipped": 0, "errors": 0}
logger.info("Querying Neo4j for recent votes...")
# Query recent ballots with vote and MP info
query = """
MATCH (b:Ballot)-[:CAST_IN]->(v:Vote)
MATCH (b)-[:CAST_BY]->(mp:MP)
WHERE v.date_time >= $cutoff_date OR v.date >= $cutoff_date
RETURN
mp.id AS mp_id,
mp.name AS mp_name,
b.vote_value AS vote_value,
COALESCE(v.vote_number, toInteger(v.id)) AS vote_number,
v.parliament_number AS parliament_number,
v.session_number AS session_number,
COALESCE(v.subject, v.description) AS subject,
v.bill_number AS bill_number,
v.result AS vote_result,
COALESCE(v.date_time, v.date) AS vote_date
ORDER BY vote_date DESC
LIMIT 5000
"""
try:
results = self.neo4j.run_query(query, {"cutoff_date": cutoff_date.isoformat()})
logger.info(f"Found {len(results)} ballot records")
except Exception as e:
logger.error(f"Error querying votes: {e}")
stats["errors"] += 1
return stats
activities_to_insert = []
for row in results:
try:
# Parse vote date
vote_date = row.get("vote_date")
if vote_date is None:
continue
# Handle both datetime and date objects
if hasattr(vote_date, 'isoformat'):
occurred_at = vote_date.isoformat()
else:
occurred_at = str(vote_date)
# Build activity
mp_id = row.get("mp_id")
if not mp_id:
continue
# Check if already exists
activity_key = self._make_activity_key("vote", "mp", mp_id, occurred_at)
# Add vote_number to make it unique per vote (not just per MP per timestamp)
vote_number = row.get("vote_number")
activity_key = f"{activity_key}:{vote_number}"
if activity_key in existing_keys:
stats["skipped"] += 1
continue
# Build session string
parliament = row.get("parliament_number")
session = row.get("session_number")
session_str = f"{parliament}-{session}" if parliament and session else None
# Build title
mp_name = row.get("mp_name", "MP")
vote_value = row.get("vote_value", "voted")
subject = row.get("subject", "a motion")
title = f"{mp_name} voted {vote_value}"
# Truncate subject for description
description = subject[:200] + "..." if subject and len(subject) > 200 else subject
activity = {
"id": str(uuid4()),
"activity_type": "vote",
"entity_type": "mp",
"entity_id": mp_id,
"title": title,
"description": description,
"metadata": json.dumps({
"mp_name": mp_name,
"vote_number": vote_number,
"vote_value": vote_value,
"vote_result": row.get("vote_result"),
"session": session_str,
"bill_number": row.get("bill_number"),
"subject": subject,
}),
"occurred_at": occurred_at,
"is_global": True, # All votes are global parliamentary events
}
activities_to_insert.append(activity)
existing_keys.add(activity_key) # Mark as seen
except Exception as e:
logger.warning(f"Error processing vote row: {e}")
stats["errors"] += 1
# Batch insert with upsert to handle duplicates gracefully
if activities_to_insert:
try:
# Insert in batches of 100, using upsert to skip duplicates
batch_size = 100
for i in range(0, len(activities_to_insert), batch_size):
batch = activities_to_insert[i:i + batch_size]
# Use upsert with on_conflict='ignore' to skip duplicates
self.supabase.table("activity_items").upsert(
batch,
on_conflict="id",
ignore_duplicates=True
).execute()
stats["inserted"] += len(batch)
logger.info(f"Inserted {stats['inserted']} vote activities...")
except Exception as e:
logger.error(f"Error inserting vote activities: {e}")
stats["errors"] += len(activities_to_insert)
logger.info(f"Vote ingestion complete: {stats['inserted']} inserted, {stats['skipped']} skipped")
return stats
def _ingest_bill_updates(self, cutoff_date: datetime, existing_keys: set) -> Dict[str, int]:
"""
Ingest bill update activities.
Creates activities for bills that have been updated recently.
"""
stats = {"inserted": 0, "skipped": 0, "errors": 0}
logger.info("Querying Neo4j for recent bill updates...")
# Query bills with recent updates
query = """
MATCH (b:Bill)
WHERE b.updated_at >= $cutoff_date
RETURN
b.number AS bill_number,
b.session AS session,
b.title AS title,
b.status AS status,
b.stage AS stage,
b.latest_event AS latest_event,
b.sponsor_name AS sponsor_name,
b.updated_at AS updated_at
ORDER BY b.updated_at DESC
LIMIT 500
"""
try:
results = self.neo4j.run_query(query, {"cutoff_date": cutoff_date.isoformat()})
logger.info(f"Found {len(results)} bill updates")
except Exception as e:
logger.error(f"Error querying bills: {e}")
stats["errors"] += 1
return stats
activities_to_insert = []
for row in results:
try:
updated_at = row.get("updated_at")
if updated_at is None:
continue
if hasattr(updated_at, 'isoformat'):
occurred_at = updated_at.isoformat()
else:
occurred_at = str(updated_at)
bill_number = row.get("bill_number")
session = row.get("session")
if not bill_number or not session:
continue
# Entity ID format: "{session}-{number}" to match bookmark format
entity_id = f"{session}-{bill_number}"
# Check if already exists
activity_key = self._make_activity_key("bill_update", "bill", entity_id, occurred_at)
if activity_key in existing_keys:
stats["skipped"] += 1
continue
# Build title
latest_event = row.get("latest_event") or row.get("status") or "Updated"
title = f"Bill {bill_number.upper()}: {latest_event}"
# Description
bill_title = row.get("title", "")
description = bill_title[:200] + "..." if len(bill_title) > 200 else bill_title
# Check if this is a global parliamentary event
is_global = is_global_bill_event(latest_event)
activity = {
"id": str(uuid4()),
"activity_type": "bill_update",
"entity_type": "bill",
"entity_id": entity_id,
"title": title,
"description": description,
"metadata": json.dumps({
"session": session,
"bill_number": bill_number,
"status": row.get("status"),
"stage": row.get("stage"),
"latest_event": latest_event,
"sponsor_name": row.get("sponsor_name"),
}),
"occurred_at": occurred_at,
"is_global": is_global, # True for major bill milestones
}
activities_to_insert.append(activity)
existing_keys.add(activity_key)
except Exception as e:
logger.warning(f"Error processing bill row: {e}")
stats["errors"] += 1
# Batch insert with upsert to handle duplicates gracefully
if activities_to_insert:
try:
batch_size = 100
for i in range(0, len(activities_to_insert), batch_size):
batch = activities_to_insert[i:i + batch_size]
# Use upsert with on_conflict='ignore' to skip duplicates
self.supabase.table("activity_items").upsert(
batch,
on_conflict="id",
ignore_duplicates=True
).execute()
stats["inserted"] += len(batch)
except Exception as e:
logger.error(f"Error inserting bill activities: {e}")
stats["errors"] += len(activities_to_insert)
logger.info(f"Bill update ingestion complete: {stats['inserted']} inserted, {stats['skipped']} skipped")
return stats
def sync_suggested_mps(self) -> Dict[str, int]:
"""
Sync suggested MPs from Neo4j to Supabase for default feed content.
Syncs:
- Party leaders (PM, Opposition Leader, party leaders) - priority 1-10
- Random MPs by province for geographic variety - priority 100+
Returns:
Dict with sync statistics
"""
stats = {"leaders": 0, "random": 0, "errors": 0}
logger.info("Syncing suggested MPs from Neo4j to Supabase...")
# Query party leaders from Neo4j (with riding/province info)
leaders_query = """
MATCH (mp:MP)-[:HOLDS_ROLE]->(role:Role)
WHERE role.is_current = true
AND role.role_type IN ['Prime Minister', 'Party Leader', 'Opposition Leader']
AND mp.current = true
MATCH (mp)-[:MEMBER_OF]->(party:Party)
OPTIONAL MATCH (mp)-[:REPRESENTS]->(riding:Riding)
RETURN DISTINCT
mp.id AS mp_id,
mp.name AS mp_name,
party.code AS party_code,
party.name AS party_name,
riding.province AS province,
riding.name AS riding_name,
role.role_type AS role_type,
role.title AS role_title
ORDER BY
CASE role.role_type
WHEN 'Prime Minister' THEN 1
WHEN 'Opposition Leader' THEN 2
ELSE 3
END
"""
# Query random MPs from each province (2 per province for variety)
random_query = """
MATCH (mp:MP)-[:MEMBER_OF]->(party:Party)
MATCH (mp)-[:REPRESENTS]->(riding:Riding)
WHERE mp.current = true
AND party.code IN ['LPC', 'CPC', 'NDP', 'BQ', 'GPC']
WITH riding.province AS province, mp, party, riding, rand() AS r
ORDER BY province, r
WITH province, COLLECT({mp: mp, party: party, riding: riding})[0..2] AS mps_by_province
UNWIND mps_by_province AS rec
RETURN
rec.mp.id AS mp_id,
rec.mp.name AS mp_name,
rec.party.code AS party_code,
rec.party.name AS party_name,
rec.riding.province AS province,
rec.riding.name AS riding_name
"""
try:
# Clear existing suggested_mps table
logger.info("Clearing existing suggested_mps table...")
self.supabase.table("suggested_mps").delete().neq("mp_id", "").execute()
# Insert leaders (priority 1-10)
leaders = self.neo4j.run_query(leaders_query)
logger.info(f"Found {len(leaders)} party leaders")
leader_ids = set()
for i, leader in enumerate(leaders):
if not leader.get("mp_id"):
continue
leader_ids.add(leader["mp_id"])
try:
self.supabase.table("suggested_mps").upsert({
"mp_id": leader["mp_id"],
"mp_name": leader.get("mp_name", ""),
"party_code": leader.get("party_code"),
"party_name": leader.get("party_name"),
"province": leader.get("province"),
"riding_name": leader.get("riding_name"),
"suggestion_type": "party_leader",
"role_title": leader.get("role_title"),
"priority": i + 1,
}).execute()
stats["leaders"] += 1
except Exception as e:
logger.warning(f"Error inserting leader {leader.get('mp_name')}: {e}")
stats["errors"] += 1
# Insert random MPs by province (priority 100+)
randoms = self.neo4j.run_query(random_query)
logger.info(f"Found {len(randoms)} random MPs by province")
for i, mp in enumerate(randoms):
if not mp.get("mp_id") or mp["mp_id"] in leader_ids:
continue # Skip if no ID or if already a leader
try:
self.supabase.table("suggested_mps").upsert({
"mp_id": mp["mp_id"],
"mp_name": mp.get("mp_name", ""),
"party_code": mp.get("party_code"),
"party_name": mp.get("party_name"),
"province": mp.get("province"),
"riding_name": mp.get("riding_name"),
"suggestion_type": "random",
"priority": 100 + i,
}).execute()
stats["random"] += 1
except Exception as e:
logger.warning(f"Error inserting random MP {mp.get('mp_name')}: {e}")
stats["errors"] += 1
logger.success(f"Suggested MPs sync complete: {stats['leaders']} leaders, {stats['random']} random")
except Exception as e:
logger.error(f"Error syncing suggested MPs: {e}")
stats["errors"] += 1
return stats
def _ingest_committee_meetings(self, cutoff_date: datetime, existing_keys: set) -> Dict[str, int]:
"""
Ingest committee meeting activities.
Creates activities for recent committee meetings.
"""
stats = {"inserted": 0, "skipped": 0, "errors": 0}
logger.info("Querying Neo4j for recent committee meetings...")
# Query recent meetings with committee info
query = """
MATCH (c:Committee)-[:HELD_MEETING]->(m:Meeting)
WHERE m.date >= $cutoff_date
RETURN
c.code AS committee_code,
c.name AS committee_name,
m.number AS meeting_number,
m.date AS meeting_date,
m.subject AS subject,
m.status AS status,
m.in_camera AS in_camera
ORDER BY m.date DESC
LIMIT 500
"""
try:
results = self.neo4j.run_query(query, {"cutoff_date": cutoff_date.date().isoformat()})
logger.info(f"Found {len(results)} committee meetings")
except Exception as e:
logger.error(f"Error querying committee meetings: {e}")
stats["errors"] += 1
return stats
activities_to_insert = []
for row in results:
try:
meeting_date = row.get("meeting_date")
if meeting_date is None:
continue
if hasattr(meeting_date, 'isoformat'):
occurred_at = meeting_date.isoformat()
else:
occurred_at = str(meeting_date)
# Add time component if missing (default to 10:00 AM for meetings)
if "T" not in occurred_at:
occurred_at = f"{occurred_at}T10:00:00"
committee_code = row.get("committee_code")
if not committee_code:
continue
# Check if already exists
meeting_number = row.get("meeting_number")
activity_key = self._make_activity_key("committee_meeting", "committee", committee_code, occurred_at)
activity_key = f"{activity_key}:{meeting_number}" # Make unique per meeting
if activity_key in existing_keys:
stats["skipped"] += 1
continue
# Build title
committee_name = row.get("committee_name", committee_code)
title = f"{committee_name} Meeting #{meeting_number}" if meeting_number else f"{committee_name} Meeting"
# Description from subject
subject = row.get("subject", "")
description = subject[:200] + "..." if len(subject) > 200 else subject
activity = {
"id": str(uuid4()),
"activity_type": "committee_meeting",
"entity_type": "committee",
"entity_id": committee_code,
"title": title,
"description": description,
"metadata": json.dumps({
"committee_code": committee_code,
"committee_name": committee_name,
"meeting_number": meeting_number,
"subject": subject,
"status": row.get("status"),
"in_camera": row.get("in_camera", False),
}),
"occurred_at": occurred_at,
"is_global": False, # Committee meetings are not global events
}
activities_to_insert.append(activity)
existing_keys.add(activity_key)
except Exception as e:
logger.warning(f"Error processing meeting row: {e}")
stats["errors"] += 1
# Batch insert with upsert to handle duplicates gracefully
if activities_to_insert:
try:
batch_size = 100
for i in range(0, len(activities_to_insert), batch_size):
batch = activities_to_insert[i:i + batch_size]
# Use upsert with on_conflict='ignore' to skip duplicates
self.supabase.table("activity_items").upsert(
batch,
on_conflict="id",
ignore_duplicates=True
).execute()
stats["inserted"] += len(batch)
except Exception as e:
logger.error(f"Error inserting committee activities: {e}")
stats["errors"] += len(activities_to_insert)
logger.info(f"Committee meeting ingestion complete: {stats['inserted']} inserted, {stats['skipped']} skipped")
return stats