#!/usr/bin/env python3
"""
Scheduled Meetings Ingestion Cloud Run Job
This job imports scheduled committee meetings from OpenParliament API
and stores them in Neo4j for efficient calendar queries.
Features:
- Fetches scheduled meetings from OpenParliament API
- Creates Meeting nodes with has_evidence = false
- Links meetings to Committee nodes
- Cleans up stale scheduled meetings (past dates)
Schedule: Daily at 5am UTC (before other committee jobs)
Environment variables required:
- NEO4J_URI: Neo4j connection URI (default: bolt://10.128.0.3:7687)
- NEO4J_USERNAME: Neo4j username (default: neo4j)
- NEO4J_PASSWORD: Neo4j password
- DRY_RUN: If "true", don't write to Neo4j (default: false)
"""
import sys
import os
import argparse
from fedmcp_pipeline.utils.neo4j_client import Neo4jClient
from fedmcp_pipeline.utils.progress import logger
from fedmcp_pipeline.ingest.scheduled_meetings import ScheduledMeetingsImporter
def main():
"""Run scheduled meetings ingestion job."""
# Parse command line arguments
parser = argparse.ArgumentParser(description="Import scheduled committee meetings")
parser.add_argument("--dry-run", action="store_true", help="Don't write to Neo4j")
parser.add_argument("--start-date", type=str, help="Start date (ISO format, default: today)")
parser.add_argument("--end-date", type=str, help="End date (ISO format, optional)")
parser.add_argument("--no-cleanup", action="store_true", help="Skip cleanup of stale meetings")
args = parser.parse_args()
logger.info("=" * 80)
logger.info("SCHEDULED MEETINGS INGESTION CLOUD RUN JOB - STARTING")
logger.info("=" * 80)
print()
# Get environment variables
neo4j_uri = os.getenv('NEO4J_URI', 'bolt://10.128.0.3:7687')
neo4j_user = os.getenv('NEO4J_USERNAME', 'neo4j')
neo4j_password = os.getenv('NEO4J_PASSWORD')
dry_run = args.dry_run or os.getenv('DRY_RUN', 'false').lower() == 'true'
if not neo4j_password:
logger.error("NEO4J_PASSWORD environment variable not set!")
sys.exit(1)
if dry_run:
logger.warning("DRY RUN MODE - No changes will be made to Neo4j")
logger.info(f"Connecting to Neo4j at {neo4j_uri}...")
neo4j = Neo4jClient(uri=neo4j_uri, user=neo4j_user, password=neo4j_password)
try:
logger.info("Running scheduled meetings ingestion from OpenParliament API...")
logger.info("This will:")
logger.info(" - Fetch scheduled meetings from OpenParliament API")
logger.info(" - Create/update Meeting nodes with has_evidence=false")
logger.info(" - Link meetings to Committee nodes")
if not args.no_cleanup:
logger.info(" - Clean up stale scheduled meetings (past dates)")
print()
# Create importer and run
importer = ScheduledMeetingsImporter(neo4j, dry_run=dry_run)
# Show current stats before import
pre_stats = importer.get_stats()
if pre_stats:
logger.info(f"Current scheduled meetings in Neo4j: {pre_stats.get('total_scheduled', 0)}")
logger.info(f" - Future meetings: {pre_stats.get('future_meetings', 0)}")
logger.info(f" - Past without evidence: {pre_stats.get('past_without_evidence', 0)}")
print()
# Run import
stats = importer.import_meetings(
start_date=args.start_date,
end_date=args.end_date,
cleanup_stale=not args.no_cleanup
)
print()
logger.success(f"Successfully imported {stats['meetings_created']} scheduled meetings")
if stats['stale_cleaned'] > 0:
logger.info(f"Cleaned up {stats['stale_cleaned']} stale meetings")
if stats['errors'] > 0:
logger.warning(f"{stats['errors']} errors occurred")
# Show post-import stats
post_stats = importer.get_stats()
if post_stats:
logger.info(f"Total scheduled meetings in Neo4j: {post_stats.get('total_scheduled', 0)}")
logger.info("=" * 80)
logger.info("SCHEDULED MEETINGS INGESTION CLOUD RUN JOB - COMPLETED")
logger.info("=" * 80)
print()
# Exit with error code if there were errors
if stats['errors'] > 0:
sys.exit(1)
except Exception as e:
logger.error(f"Scheduled meetings ingestion job failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
neo4j.close()
if __name__ == "__main__":
main()