"""
News Integration Pipeline.
Ingests Canadian political news from RSS feeds, extracts entity mentions (MPs, bills,
committees), stores articles in Neo4j, and creates activity feed items in Supabase.
Data Flow:
1. Fetch RSS feeds from Canadian political news sources
2. Parse articles, extract entities (MPs, bills, committees)
3. Store NewsArticle nodes in Neo4j with MENTIONED_IN relationships
4. Create activity_items in Supabase for users who bookmarked mentioned entities
News Sources:
- CBC Politics
- Globe & Mail Politics
- National Post Politics
- CTV News Politics
- iPolitics
- LEGISinfo Bill Events
"""
import os
import re
import hashlib
import unicodedata
import socket
import urllib.request
import urllib.error
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional, Set, Tuple
from dataclasses import dataclass
import feedparser
# Set default socket timeout for all urllib requests (30 seconds)
socket.setdefaulttimeout(30)
from supabase import create_client, Client
from ..utils.neo4j_client import Neo4jClient
from ..utils.progress import logger
from .news_scoring import (
score_article,
store_article_score,
ExtractedEntities as ScoringEntities,
)
# ============================================
# Data Classes
# ============================================
@dataclass
class NewsArticle:
"""Parsed news article from RSS feed."""
url: str
title: str
summary: str
source_id: str
published_at: Optional[datetime]
image_url: Optional[str]
@dataclass
class ExtractedEntities:
"""Entities extracted from article text."""
mp_ids: List[str] # parl_mp_ids
bill_ids: List[str] # Bill numbers like "C-11", "S-233"
committee_codes: List[str] # Committee codes like "FINA", "JUST"
# ============================================
# Entity Extraction Patterns
# ============================================
# Bill number pattern: C-11, S-233, C-234A (with optional "Bill" prefix)
BILL_PATTERN = re.compile(
r'\b(?:Bill\s+)?([CS])-(\d+[A-Z]?)\b',
re.IGNORECASE
)
# Committee keywords to codes mapping
COMMITTEE_KEYWORDS = {
'FINA': ['finance', 'budget', 'fiscal', 'treasury', 'taxation'],
'JUST': ['justice', 'legal', 'criminal', 'judiciary'],
'ETHI': ['ethics', 'conflict of interest', 'lobbying', 'integrity'],
'ENVI': ['environment', 'climate', 'carbon', 'emissions', 'pollution'],
'HUMA': ['human resources', 'employment', 'labour', 'social development'],
'TRAN': ['transport', 'infrastructure', 'highways', 'rail', 'aviation'],
'HESA': ['health', 'pandemic', 'medical', 'healthcare', 'drugs'],
'INDU': ['industry', 'technology', 'innovation', 'science'],
'AGRI': ['agriculture', 'farming', 'agri-food', 'rural'],
'NDDN': ['defence', 'military', 'armed forces', 'nato', 'veterans'],
'FAAE': ['foreign affairs', 'international', 'diplomacy', 'aid'],
'CIMM': ['immigration', 'refugees', 'citizenship', 'border'],
'INAN': ['indigenous', 'first nations', 'métis', 'inuit', 'aboriginal'],
'FOPO': ['fisheries', 'oceans', 'marine', 'coast guard'],
'PROC': ['procedure', 'house affairs', 'elections', 'parliamentary'],
'OGGO': ['government operations', 'procurement', 'public service'],
'SECU': ['public safety', 'security', 'rcmp', 'terrorism', 'crime'],
'CHPC': ['heritage', 'culture', 'broadcasting', 'arts', 'sport'],
}
# Honorific patterns to remove from MP names
HONORIFIC_PATTERN = re.compile(
r'\b(Hon\.|Rt\.\s*Hon\.|The\s+Honourable|L\'hon\.)\s*',
re.IGNORECASE
)
# ============================================
# Name Normalization (from hansard.py patterns)
# ============================================
def normalize_name(name: str) -> str:
"""
Normalize a name for fuzzy matching.
- Removes accents/diacritics
- Converts to lowercase
- Removes honorifics
- Removes extra whitespace
"""
if not name:
return ""
# Remove honorifics
name = HONORIFIC_PATTERN.sub('', name)
# Remove accents: é → e, è → e, ñ → n, etc.
name = ''.join(
char for char in unicodedata.normalize('NFD', name)
if unicodedata.category(char) != 'Mn'
)
# Remove periods (middle initials)
name = name.replace('.', '')
# Lowercase and normalize whitespace
name = ' '.join(name.lower().split())
return name
# Common nickname mappings
NICKNAME_MAPPING = {
'bobby': 'robert', 'rob': 'robert', 'bob': 'robert',
'bill': 'william', 'dick': 'richard',
'jim': 'james', 'joe': 'joseph',
'mike': 'michael', 'tony': 'anthony',
}
# ============================================
# News Ingestion Pipeline
# ============================================
class NewsIngestionPipeline:
"""
Ingests news from RSS feeds with entity extraction.
Stores articles in Neo4j and creates activity feed items in Supabase.
"""
def __init__(
self,
neo4j_client: Neo4jClient,
supabase_url: Optional[str] = None,
supabase_key: Optional[str] = None,
):
"""
Initialize News Ingestion Pipeline.
Args:
neo4j_client: Neo4j client instance
supabase_url: Supabase project URL (or from env)
supabase_key: Supabase service role key (or from env)
"""
self.neo4j = neo4j_client
# Get Supabase credentials
self.supabase_url = supabase_url or os.getenv('SUPABASE_URL')
self.supabase_key = supabase_key or os.getenv('SUPABASE_SERVICE_ROLE_KEY')
if not self.supabase_url or not self.supabase_key:
raise ValueError(
"Supabase credentials required. Set SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY "
"environment variables."
)
self.supabase: Client = create_client(self.supabase_url, self.supabase_key)
# Cache MPs for entity extraction
self.mp_cache: Dict[str, Dict] = {}
self.mp_name_index: Dict[str, str] = {} # normalized_name -> mp_id
def run(
self,
source_id: Optional[str] = None,
limit: Optional[int] = None,
dry_run: bool = False,
) -> Dict[str, Any]:
"""
Run the news ingestion pipeline.
Args:
source_id: Specific source to ingest (None = all active sources)
limit: Maximum articles per source (for testing)
dry_run: If True, don't persist to databases
Returns:
Dict with ingestion statistics
"""
logger.info("Starting news ingestion pipeline...")
# Load MP cache for entity extraction
self._load_mp_cache()
# Get active news sources
sources = self._get_active_sources(source_id)
logger.info(f"Processing {len(sources)} news source(s)")
stats = {
"sources_processed": 0,
"articles_fetched": 0,
"articles_new": 0,
"articles_stored": 0,
"articles_scored": 0,
"entities_extracted": 0,
"activity_items_created": 0,
"errors": [],
}
for source in sources:
try:
source_stats = self._process_source(source, limit, dry_run)
stats["sources_processed"] += 1
stats["articles_fetched"] += source_stats["fetched"]
stats["articles_new"] += source_stats["new"]
stats["articles_stored"] += source_stats["stored"]
stats["articles_scored"] += source_stats.get("scored", 0)
stats["entities_extracted"] += source_stats["entities"]
stats["activity_items_created"] += source_stats["activity_items"]
except Exception as e:
logger.error(f"Error processing source {source['id']}: {e}")
stats["errors"].append({"source": source["id"], "error": str(e)})
logger.info(
f"News ingestion complete: {stats['articles_new']} new articles, "
f"{stats['articles_scored']} scored, "
f"{stats['entities_extracted']} entities extracted, "
f"{stats['activity_items_created']} activity items created"
)
return stats
def _load_mp_cache(self) -> None:
"""Load all MPs from Neo4j for entity extraction."""
logger.info("Loading MP cache from Neo4j...")
result = self.neo4j.run_query("""
MATCH (mp:MP)
WHERE mp.current = true
RETURN mp.id AS id, mp.name AS name, mp.given_name AS given_name,
mp.family_name AS family_name, mp.parl_mp_id AS parl_mp_id
""")
for row in result:
mp_id = row["id"]
self.mp_cache[mp_id] = row
# Build name index for matching
full_name = normalize_name(row["name"] or "")
if full_name:
self.mp_name_index[full_name] = mp_id
# Also index given + family name variations
given = normalize_name(row["given_name"] or "")
family = normalize_name(row["family_name"] or "")
if given and family:
self.mp_name_index[f"{given} {family}"] = mp_id
self.mp_name_index[f"{family} {given}"] = mp_id
# First name only + last name
first_only = given.split()[0] if given else ""
if first_only:
self.mp_name_index[f"{first_only} {family}"] = mp_id
logger.info(f"Loaded {len(self.mp_cache)} MPs, {len(self.mp_name_index)} name variations")
def _get_active_sources(self, source_id: Optional[str] = None) -> List[Dict]:
"""Get active news sources from Supabase."""
query = self.supabase.table("news_sources").select("*").eq("is_active", True)
if source_id:
query = query.eq("id", source_id)
response = query.execute()
return response.data or []
def _process_source(
self,
source: Dict,
limit: Optional[int],
dry_run: bool,
) -> Dict[str, int]:
"""Process a single news source."""
logger.info(f"Processing source: {source['name']} ({source['id']})")
stats = {"fetched": 0, "new": 0, "stored": 0, "scored": 0, "entities": 0, "activity_items": 0}
# Fetch RSS feed with timeout
try:
logger.info(f"Fetching RSS from {source['rss_url']}...")
# Use urllib to fetch with explicit timeout, then parse
req = urllib.request.Request(
source["rss_url"],
headers={'User-Agent': 'CanadaGPT/1.0 News Aggregator'}
)
with urllib.request.urlopen(req, timeout=30) as response:
feed_content = response.read()
feed = feedparser.parse(feed_content)
if feed.bozo:
logger.warning(f"RSS parse warning for {source['id']}: {feed.bozo_exception}")
except socket.timeout:
logger.error(f"Timeout fetching RSS for {source['id']} (30s)")
raise
except urllib.error.URLError as e:
logger.error(f"URL error fetching RSS for {source['id']}: {e}")
raise
except Exception as e:
logger.error(f"Failed to fetch RSS for {source['id']}: {e}")
raise
articles = feed.entries[:limit] if limit else feed.entries
stats["fetched"] = len(articles)
logger.info(f"Fetched {len(articles)} articles from {source['name']}")
# Get existing URLs for deduplication
existing_urls = self._get_existing_urls(source["id"])
for entry in articles:
url = entry.get("link", "")
if not url or url in existing_urls:
continue
stats["new"] += 1
# Parse article
article = self._parse_rss_entry(entry, source["id"])
if not article:
continue
# Extract entities
entities = self._extract_entities(article)
entity_count = len(entities.mp_ids) + len(entities.bill_ids) + len(entities.committee_codes)
stats["entities"] += entity_count
if dry_run:
logger.info(f"[DRY RUN] Would store: {article.title[:50]}... ({entity_count} entities)")
continue
# Store in Neo4j
try:
self._store_neo4j_article(article, entities)
stats["stored"] += 1
except Exception as e:
logger.error(f"Failed to store article in Neo4j: {e}")
continue
# Create activity items in Supabase
try:
items_created = self._create_activity_items(article, entities)
stats["activity_items"] += items_created
except Exception as e:
logger.error(f"Failed to create activity items: {e}")
# Calculate and store relevance score
try:
scoring_entities = ScoringEntities(
mp_ids=entities.mp_ids,
bill_ids=entities.bill_ids,
committee_codes=entities.committee_codes,
)
article_score = score_article(
title=article.title,
summary=article.summary,
source_id=article.source_id,
published_at=article.published_at,
entities=scoring_entities,
)
store_article_score(self.supabase, article.url, article_score)
stats["scored"] += 1
logger.debug(
f"Article score: {article_score.total_score:.1f} "
f"(entity={article_score.entity_score}, source={article_score.source_score}, "
f"recency={article_score.recency_score:.1f}, breaking={article_score.breaking_score})"
)
except Exception as e:
logger.warning(f"Failed to calculate/store article score: {e}")
# Mark as processed
self._mark_processed(article, source["id"], entity_count)
# Update source timestamp
if not dry_run:
self._update_source_timestamp(source["id"])
return stats
def _parse_rss_entry(self, entry: Dict, source_id: str) -> Optional[NewsArticle]:
"""Parse an RSS entry into a NewsArticle."""
url = entry.get("link", "")
title = entry.get("title", "")
summary = entry.get("summary", entry.get("description", ""))
if not url or not title:
return None
# Clean summary (strip HTML, truncate)
summary = self._clean_html(summary)[:500]
# Parse published date
published_at = None
if "published_parsed" in entry and entry["published_parsed"]:
try:
published_at = datetime(*entry["published_parsed"][:6], tzinfo=timezone.utc)
except Exception:
pass
# Get image URL
image_url = None
if "media_content" in entry and entry["media_content"]:
image_url = entry["media_content"][0].get("url")
elif "enclosures" in entry and entry["enclosures"]:
for enc in entry["enclosures"]:
if enc.get("type", "").startswith("image/"):
image_url = enc.get("href")
break
return NewsArticle(
url=url,
title=title,
summary=summary,
source_id=source_id,
published_at=published_at,
image_url=image_url,
)
def _clean_html(self, text: str) -> str:
"""Remove HTML tags from text."""
if not text:
return ""
text = re.sub(r'<[^>]+>', '', text)
text = text.replace('&', '&')
text = text.replace('<', '<')
text = text.replace('>', '>')
text = text.replace('"', '"')
text = text.replace(''', "'")
text = text.replace(' ', ' ')
return ' '.join(text.split())
def _extract_entities(self, article: NewsArticle) -> ExtractedEntities:
"""Extract MPs, bills, and committees from article text."""
text = f"{article.title} {article.summary}"
# Extract bill IDs (deterministic)
bill_ids = self._extract_bill_ids(text)
# Extract MP mentions (fuzzy matching)
mp_ids = self._extract_mp_mentions(text)
# Extract committee mentions (keyword-based)
committee_codes = self._extract_committee_mentions(text)
return ExtractedEntities(
mp_ids=list(set(mp_ids)),
bill_ids=list(set(bill_ids)),
committee_codes=list(set(committee_codes)),
)
def _extract_bill_ids(self, text: str) -> List[str]:
"""Extract bill IDs from text (e.g., C-11, S-233)."""
matches = BILL_PATTERN.findall(text)
return [f"{m[0].upper()}-{m[1]}" for m in matches]
def _extract_mp_mentions(self, text: str) -> List[str]:
"""Extract MP mentions from text using fuzzy name matching."""
mp_ids = []
text_normalized = normalize_name(text)
# Check each MP name in our index
for name, mp_id in self.mp_name_index.items():
if name in text_normalized:
mp_ids.append(mp_id)
return mp_ids
def _extract_committee_mentions(self, text: str) -> List[str]:
"""Extract committee codes from text using keyword matching."""
text_lower = text.lower()
codes = []
for code, keywords in COMMITTEE_KEYWORDS.items():
if any(kw in text_lower for kw in keywords):
codes.append(code)
return codes
def _get_existing_urls(self, source_id: str) -> Set[str]:
"""Get URLs that have already been processed."""
response = self.supabase.table("news_articles_processed")\
.select("url")\
.eq("source_id", source_id)\
.execute()
return {row["url"] for row in (response.data or [])}
def _generate_article_id(self, url: str) -> str:
"""Generate a unique ID for an article based on URL."""
return hashlib.sha256(url.encode()).hexdigest()[:16]
def _store_neo4j_article(self, article: NewsArticle, entities: ExtractedEntities) -> None:
"""Store article in Neo4j with entity relationships."""
article_id = self._generate_article_id(article.url)
# Create NewsArticle node
self.neo4j.run_query("""
MERGE (a:NewsArticle {id: $id})
SET a.url = $url,
a.title = $title,
a.summary = $summary,
a.source = $source,
a.published_at = datetime($published_at),
a.ingested_at = datetime(),
a.image_url = $image_url,
a.published_date = $published_date,
a.description = $summary,
a.last_updated = toString(datetime())
""", {
"id": article_id,
"url": article.url,
"title": article.title,
"summary": article.summary,
"source": article.source_id,
"published_at": article.published_at.isoformat() if article.published_at else None,
"published_date": article.published_at.strftime("%Y-%m-%d") if article.published_at else None,
"image_url": article.image_url,
})
# Create MP relationships
for mp_id in entities.mp_ids:
self.neo4j.run_query("""
MATCH (mp:MP {id: $mp_id})
MATCH (a:NewsArticle {id: $article_id})
MERGE (mp)-[r:MENTIONED_IN]->(a)
SET r.confidence = 1.0, r.mention_count = 1
""", {"mp_id": mp_id, "article_id": article_id})
# Create Bill relationships
for bill_id in entities.bill_ids:
# Bill ID format in Neo4j might vary, try matching by number
self.neo4j.run_query("""
MATCH (b:Bill)
WHERE b.number = $bill_number
MATCH (a:NewsArticle {id: $article_id})
MERGE (b)-[r:MENTIONED_IN]->(a)
SET r.confidence = 1.0, r.mention_count = 1
""", {"bill_number": bill_id, "article_id": article_id})
# Create Committee relationships
for code in entities.committee_codes:
self.neo4j.run_query("""
MATCH (c:Committee {code: $code})
MATCH (a:NewsArticle {id: $article_id})
MERGE (c)-[r:MENTIONED_IN]->(a)
SET r.confidence = 0.8, r.mention_count = 1
""", {"code": code, "article_id": article_id})
def _create_activity_items(self, article: NewsArticle, entities: ExtractedEntities) -> int:
"""Create activity items for users who bookmarked mentioned entities."""
items_created = 0
# Create for MP mentions
if entities.mp_ids:
result = self.supabase.rpc("create_news_activity_items", {
"p_article_url": article.url,
"p_article_title": article.title,
"p_source_id": article.source_id,
"p_entity_type": "mp",
"p_entity_ids": entities.mp_ids,
}).execute()
items_created += result.data if result.data else 0
# Create for Bill mentions
if entities.bill_ids:
result = self.supabase.rpc("create_news_activity_items", {
"p_article_url": article.url,
"p_article_title": article.title,
"p_source_id": article.source_id,
"p_entity_type": "bill",
"p_entity_ids": entities.bill_ids,
}).execute()
items_created += result.data if result.data else 0
# Create for Committee mentions
if entities.committee_codes:
result = self.supabase.rpc("create_news_activity_items", {
"p_article_url": article.url,
"p_article_title": article.title,
"p_source_id": article.source_id,
"p_entity_type": "committee",
"p_entity_ids": entities.committee_codes,
}).execute()
items_created += result.data if result.data else 0
return items_created
def _mark_processed(self, article: NewsArticle, source_id: str, entity_count: int) -> None:
"""Mark article as processed in Supabase."""
self.supabase.table("news_articles_processed").upsert({
"url": article.url,
"source_id": source_id,
"title": article.title,
"entity_count": entity_count,
"created_activity_items": True,
"image_url": article.image_url,
"summary": article.summary,
}).execute()
def _update_source_timestamp(self, source_id: str) -> None:
"""Update last_fetched_at timestamp for source."""
self.supabase.table("news_sources").update({
"last_fetched_at": datetime.now(timezone.utc).isoformat(),
"error_count": 0, # Reset error count on success
}).eq("id", source_id).execute()
def main():
"""Entry point for news ingestion pipeline."""
import argparse
parser = argparse.ArgumentParser(description="News Integration Pipeline")
parser.add_argument("--source", help="Specific source ID to process")
parser.add_argument("--limit", type=int, help="Max articles per source (for testing)")
parser.add_argument("--dry-run", action="store_true", help="Don't persist to databases")
parser.add_argument("--neo4j-uri", default=os.getenv("NEO4J_URI", "bolt://localhost:7687"))
parser.add_argument("--neo4j-user", default=os.getenv("NEO4J_USERNAME", "neo4j"))
parser.add_argument("--neo4j-password", default=os.getenv("NEO4J_PASSWORD"))
args = parser.parse_args()
# Initialize Neo4j client
neo4j = Neo4jClient(
uri=args.neo4j_uri,
user=args.neo4j_user,
password=args.neo4j_password,
)
# Run pipeline
pipeline = NewsIngestionPipeline(neo4j)
stats = pipeline.run(
source_id=args.source,
limit=args.limit,
dry_run=args.dry_run,
)
# Print summary
print("\n=== News Ingestion Complete ===")
print(f"Sources processed: {stats['sources_processed']}")
print(f"Articles fetched: {stats['articles_fetched']}")
print(f"New articles: {stats['articles_new']}")
print(f"Articles stored: {stats['articles_stored']}")
print(f"Articles scored: {stats['articles_scored']}")
print(f"Entities extracted: {stats['entities_extracted']}")
print(f"Activity items created: {stats['activity_items_created']}")
if stats["errors"]:
print(f"Errors: {len(stats['errors'])}")
for err in stats["errors"]:
print(f" - {err['source']}: {err['error']}")
if __name__ == "__main__":
main()