#!/usr/bin/env python3
"""
News Ingestion Cloud Run Job
This job fetches Canadian political news from RSS feeds, extracts entity mentions
(MPs, bills, committees), stores articles in Neo4j, and creates activity feed
items in Supabase.
News Sources:
- CBC Politics
- Globe & Mail Politics
- National Post Politics
- CTV News Politics
- iPolitics
- LEGISinfo Bill Events
Environment variables required:
- NEO4J_URI: Neo4j connection URI (default: bolt://10.128.0.3:7687)
- NEO4J_USERNAME: Neo4j username (default: neo4j)
- NEO4J_PASSWORD: Neo4j password
- SUPABASE_URL: Supabase project URL
- SUPABASE_SERVICE_ROLE_KEY: Supabase service role key
"""
import sys
import os
from fedmcp_pipeline.utils.neo4j_client import Neo4jClient
from fedmcp_pipeline.utils.progress import logger
from fedmcp_pipeline.ingest.news import NewsIngestionPipeline
def main():
"""Run news ingestion job."""
logger.info("=" * 80)
logger.info("NEWS INGESTION CLOUD RUN JOB - STARTING")
logger.info("=" * 80)
print()
# Get environment variables
neo4j_uri = os.getenv('NEO4J_URI', 'bolt://10.128.0.3:7687')
neo4j_user = os.getenv('NEO4J_USERNAME', 'neo4j')
neo4j_password = os.getenv('NEO4J_PASSWORD')
supabase_url = os.getenv('SUPABASE_URL')
supabase_key = os.getenv('SUPABASE_SERVICE_ROLE_KEY')
source_id = os.getenv('NEWS_SOURCE_ID') # Optional: specific source to process
limit = os.getenv('NEWS_ARTICLE_LIMIT') # Optional: max articles per source
# Validate required environment variables
missing = []
if not neo4j_password:
missing.append("NEO4J_PASSWORD")
if not supabase_url:
missing.append("SUPABASE_URL")
if not supabase_key:
missing.append("SUPABASE_SERVICE_ROLE_KEY")
if missing:
logger.error(f"Missing required environment variables: {', '.join(missing)}")
sys.exit(1)
# Parse optional limit
article_limit = None
if limit:
try:
article_limit = int(limit)
except ValueError:
logger.warning(f"Invalid NEWS_ARTICLE_LIMIT value: {limit}, ignoring")
logger.info(f"Configuration:")
logger.info(f" Neo4j URI: {neo4j_uri}")
logger.info(f" Supabase URL: {supabase_url[:50]}...")
if source_id:
logger.info(f" Source filter: {source_id}")
if article_limit:
logger.info(f" Article limit per source: {article_limit}")
print()
logger.info(f"Connecting to Neo4j at {neo4j_uri}...")
neo4j = Neo4jClient(uri=neo4j_uri, user=neo4j_user, password=neo4j_password)
try:
logger.info("Running news ingestion...")
logger.info("This will:")
logger.info(" - Fetch RSS feeds from Canadian political news sources")
logger.info(" - Extract entity mentions (MPs, bills, committees)")
logger.info(" - Store NewsArticle nodes in Neo4j with relationships")
logger.info(" - Create activity_items in Supabase for bookmarked entities")
print()
# Create pipeline and run
pipeline = NewsIngestionPipeline(
neo4j_client=neo4j,
supabase_url=supabase_url,
supabase_key=supabase_key,
)
stats = pipeline.run(
source_id=source_id,
limit=article_limit,
dry_run=False,
)
print()
logger.info("=" * 80)
logger.info("NEWS INGESTION CLOUD RUN JOB - COMPLETED")
logger.info("=" * 80)
print()
logger.info(f"Summary:")
logger.info(f" Sources processed: {stats['sources_processed']}")
logger.info(f" Articles fetched: {stats['articles_fetched']}")
logger.info(f" New articles: {stats['articles_new']}")
logger.info(f" Articles stored: {stats['articles_stored']}")
logger.info(f" Entities extracted: {stats['entities_extracted']}")
logger.info(f" Activity items created: {stats['activity_items_created']}")
if stats['errors']:
logger.warning(f" Errors: {len(stats['errors'])}")
for err in stats['errors']:
logger.warning(f" - {err['source']}: {err['error']}")
# Exit with error code if all sources failed
if stats['sources_processed'] == 0 and stats['errors']:
logger.error("All sources failed!")
sys.exit(1)
except Exception as e:
logger.error(f"News ingestion job failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
neo4j.close()
if __name__ == "__main__":
main()