"""
Scheduled Committee Meetings Ingestion.
This module fetches scheduled committee meetings from OpenParliament API
and stores them in Neo4j for efficient calendar queries.
Meetings with transcripts (has_evidence=True) are imported by the committee
evidence ingestion job. This job handles scheduled/future meetings only.
"""
import sys
from pathlib import Path
from typing import Dict, Any, List, Optional
from datetime import datetime, date, timedelta
import requests
from ..utils.neo4j_client import Neo4jClient
from ..utils.progress import logger, ProgressTracker
class ScheduledMeetingsImporter:
"""
Import scheduled committee meetings from OpenParliament API to Neo4j.
This importer:
1. Fetches meetings from OpenParliament API (date >= today)
2. Filters for meetings without evidence (scheduled only)
3. Creates/updates Meeting nodes in Neo4j
4. Links meetings to Committee nodes via HELD_MEETING relationship
5. Cleans up stale scheduled meetings (past dates without evidence)
"""
OPENPARLIAMENT_API = "https://api.openparliament.ca"
USER_AGENT = "CanadaGPT/1.0 (contact@canadagpt.ca)"
def __init__(
self,
neo4j_client: Neo4jClient,
dry_run: bool = False
):
"""
Initialize Scheduled Meetings importer.
Args:
neo4j_client: Neo4j client instance
dry_run: If True, don't write to Neo4j
"""
self.neo4j = neo4j_client
self.dry_run = dry_run
def fetch_scheduled_meetings(
self,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = 1000
) -> List[Dict[str, Any]]:
"""
Fetch scheduled meetings from OpenParliament API.
Args:
start_date: ISO date string for minimum date (default: today)
end_date: ISO date string for maximum date (optional)
limit: Maximum meetings to fetch
Returns:
List of meeting dictionaries from API
"""
if not start_date:
start_date = date.today().isoformat()
url = f"{self.OPENPARLIAMENT_API}/committees/meetings/"
params = {
"date__gte": start_date,
"limit": limit,
}
if end_date:
params["date__lte"] = end_date
logger.info(f"Fetching scheduled meetings from {start_date}...")
try:
response = requests.get(
url,
params=params,
headers={
"User-Agent": self.USER_AGENT,
"Accept": "application/json",
},
timeout=30
)
response.raise_for_status()
data = response.json()
meetings = data.get("objects", [])
# Filter for meetings without evidence (scheduled only)
scheduled = [m for m in meetings if not m.get("has_evidence", False)]
logger.info(f"Found {len(meetings)} meetings, {len(scheduled)} scheduled (no evidence)")
return scheduled
except requests.RequestException as e:
logger.error(f"Error fetching scheduled meetings: {e}")
return []
def fetch_committee_names(self) -> Dict[str, str]:
"""
Fetch committee metadata for display names.
Returns:
Dict mapping committee code to display name
"""
url = f"{self.OPENPARLIAMENT_API}/committees/"
try:
response = requests.get(
url,
params={"limit": 1000},
headers={
"User-Agent": self.USER_AGENT,
"Accept": "application/json",
},
timeout=30
)
response.raise_for_status()
data = response.json()
committees = data.get("objects", [])
name_map = {}
for c in committees:
# Extract code from URL like "/committees/finance/" -> "finance"
url_path = c.get("url", "")
match = url_path.split("/committees/")
if len(match) > 1:
code = match[1].rstrip("/")
short_name = c.get("short_name", {}).get("en") or c.get("name", {}).get("en", code)
name_map[code] = short_name
logger.info(f"Fetched {len(name_map)} committee names")
return name_map
except requests.RequestException as e:
logger.error(f"Error fetching committee names: {e}")
return {}
def extract_committee_code(self, committee_url: str) -> str:
"""Extract committee code from OpenParliament URL."""
# URL format: /committees/finance/
parts = committee_url.split("/committees/")
if len(parts) > 1:
return parts[1].rstrip("/")
return "unknown"
def import_meetings(
self,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
cleanup_stale: bool = True
) -> Dict[str, int]:
"""
Import scheduled meetings to Neo4j.
Args:
start_date: ISO date string for minimum date (default: today)
end_date: ISO date string for maximum date (optional)
cleanup_stale: Remove stale scheduled meetings (past dates without evidence)
Returns:
Dict with import statistics
"""
stats = {
"meetings_created": 0,
"meetings_updated": 0,
"committees_linked": 0,
"stale_cleaned": 0,
"errors": 0
}
# Fetch data
meetings = self.fetch_scheduled_meetings(start_date, end_date)
if not meetings:
logger.warning("No scheduled meetings to import")
return stats
committee_names = self.fetch_committee_names()
# Prepare meeting data for batch import
meeting_data = []
for m in meetings:
committee_code = self.extract_committee_code(m.get("committee_url", ""))
committee_name = committee_names.get(committee_code, committee_code)
# Generate a stable ID for the meeting
# Format: committee_code-number-parliament (e.g., "fina-42-44")
meeting_id = f"{committee_code}-{m.get('number', 0)}-{m.get('parliament_number', 0)}"
meeting_data.append({
"id": meeting_id,
"committee_code": committee_code,
"committee_name": committee_name,
"number": m.get("number"),
"parliament": m.get("parliament_number"),
"session": f"{m.get('parliament_number', '')}-{m.get('session_number', '')}",
"date": m.get("date"),
"in_camera": m.get("in_camera", False),
"has_evidence": False,
"scheduled_source": "openparliament",
"subject": m.get("subject"),
})
if self.dry_run:
logger.info(f"DRY RUN: Would import {len(meeting_data)} meetings")
for m in meeting_data[:5]:
logger.info(f" - {m['committee_code']} #{m['number']} on {m['date']}")
return stats
# Batch import meetings
logger.info(f"Importing {len(meeting_data)} scheduled meetings...")
progress = ProgressTracker(len(meeting_data), "meetings")
try:
# Use MERGE to create or update meetings
result = self.neo4j.run_query("""
UNWIND $meetings AS m
MERGE (meeting:Meeting {
committee_code: m.committee_code,
number: m.number,
parliament: m.parliament
})
ON CREATE SET
meeting.id = m.id,
meeting.date = date(m.date),
meeting.in_camera = m.in_camera,
meeting.has_evidence = false,
meeting.scheduled_source = m.scheduled_source,
meeting.subject = m.subject,
meeting.session = m.session,
meeting.created_at = datetime(),
meeting.updated_at = datetime()
ON MATCH SET
meeting.date = date(m.date),
meeting.in_camera = m.in_camera,
meeting.subject = m.subject,
meeting.session = m.session,
meeting.updated_at = datetime()
WITH meeting, m
// Link to Committee
OPTIONAL MATCH (c:Committee)
WHERE toLower(c.code) = toLower(m.committee_code)
OR toLower(c.short_name) = toLower(m.committee_code)
WITH meeting, c
WHERE c IS NOT NULL
MERGE (c)-[:HELD_MEETING]->(meeting)
RETURN count(meeting) AS count
""", {"meetings": meeting_data})
count = result[0]["count"] if result else 0
stats["meetings_created"] = count
logger.success(f"Imported {count} scheduled meetings")
except Exception as e:
logger.error(f"Error importing meetings: {e}")
stats["errors"] += 1
return stats
# Clean up stale scheduled meetings
if cleanup_stale:
stats["stale_cleaned"] = self._cleanup_stale_meetings()
return stats
def _cleanup_stale_meetings(self) -> int:
"""
Remove stale scheduled meetings (past dates without evidence).
Meetings that are past their scheduled date but still have
has_evidence=false are considered stale and should be cleaned up.
Returns:
Number of stale meetings removed
"""
logger.info("Cleaning up stale scheduled meetings...")
try:
# Delete meetings that are:
# 1. Past their scheduled date
# 2. Still have has_evidence = false
# 3. Were sourced from OpenParliament scheduled meetings
result = self.neo4j.run_query("""
MATCH (m:Meeting)
WHERE m.has_evidence = false
AND m.scheduled_source = 'openparliament'
AND m.date < date()
DETACH DELETE m
RETURN count(m) AS deleted
""")
deleted = result[0]["deleted"] if result else 0
if deleted > 0:
logger.info(f"Cleaned up {deleted} stale scheduled meetings")
return deleted
except Exception as e:
logger.error(f"Error cleaning up stale meetings: {e}")
return 0
def get_stats(self) -> Dict[str, Any]:
"""Get current stats on scheduled meetings in Neo4j."""
try:
result = self.neo4j.run_query("""
MATCH (m:Meeting)
WHERE m.has_evidence = false
RETURN
count(m) AS total_scheduled,
count(CASE WHEN m.date >= date() THEN 1 END) AS future_meetings,
count(CASE WHEN m.date < date() THEN 1 END) AS past_without_evidence
""")
if result:
return {
"total_scheduled": result[0]["total_scheduled"],
"future_meetings": result[0]["future_meetings"],
"past_without_evidence": result[0]["past_without_evidence"]
}
return {}
except Exception as e:
logger.error(f"Error getting stats: {e}")
return {}