Skip to main content
Glama
northernvariables

FedMCP - Federal Parliamentary Information

daily-committee-import.py7.51 kB
#!/usr/bin/env python3 """Daily committee meeting import job - discovers and imports new committee meetings.""" import sys import os from pathlib import Path from datetime import datetime, timedelta from typing import Dict, Any, List, Optional import re import requests from bs4 import BeautifulSoup # Add packages to path sys.path.insert(0, str(Path(__file__).parent.parent / 'packages' / 'data-pipeline')) from fedmcp_pipeline.utils.neo4j_client import Neo4jClient from fedmcp_pipeline.utils.progress import logger def parse_meeting_html(html: str, meeting_date: str) -> List[Dict[str, Any]]: """Parse FilteredMeetings HTML to extract meeting metadata.""" soup = BeautifulSoup(html, 'html.parser') meetings = [] # Find all meeting items meeting_items = soup.find_all('div', class_=re.compile(r'accordion-item meeting-item')) logger.info(f"Found {len(meeting_items)} meeting items in HTML") for item in meeting_items: try: # Extract meeting ID from element ID meeting_id_match = re.search(r'meeting-item-(\d+)', item.get('id', '')) if not meeting_id_match: continue meeting_id = meeting_id_match.group(1) # Extract committee acronym acronym_elem = item.find('span', class_='meeting-acronym') if not acronym_elem: continue committee_acronym = acronym_elem.text.strip() # Extract time time_elem = item.find('div', class_='the-time') time_str = time_elem.text.strip() if time_elem else "" # Extract subject/bill subject_elem = item.find('div', class_='studies-activities-item') subject = subject_elem.text.strip() if subject_elem else "" # Extract status status_elem = item.find('div', class_='meeting-card-meeting-status') status = status_elem.text.strip() if status_elem else "Unknown" # Check for webcast webcast = bool(item.find('i', class_='icon-web-video-cast')) meeting = { 'meeting_id': meeting_id, 'committee_acronym': committee_acronym, 'date': meeting_date, 'time_str': time_str, 'subject': subject, 'status': status, 'webcast': webcast, } meetings.append(meeting) except Exception as e: logger.warning(f"Error parsing meeting item: {e}") continue return meetings def fetch_meetings_for_date(date_str: str) -> List[Dict[str, Any]]: """Fetch committee meetings for a specific date using FilteredMeetings endpoint.""" url = f"https://www.ourcommons.ca/committees/en/FilteredMeetings?meetingDate={date_str}" try: logger.info(f"Fetching meetings for {date_str}: {url}") response = requests.get(url, timeout=30) response.raise_for_status() meetings = parse_meeting_html(response.text, date_str) logger.info(f"Found {len(meetings)} meetings on {date_str}") return meetings except requests.exceptions.RequestException as e: logger.error(f"Error fetching meetings for {date_str}: {e}") return [] def import_meeting_to_neo4j(neo4j: Neo4jClient, meeting: Dict[str, Any]): """Import or update a meeting in Neo4j.""" # Check if meeting already exists check_query = """ MATCH (m:Meeting {ourcommons_meeting_id: $meeting_id}) RETURN m.ourcommons_meeting_id as id """ result = neo4j.run_query(check_query, {"meeting_id": meeting['meeting_id']}) if result and len(result) > 0: logger.info(f"Meeting {meeting['meeting_id']} already exists, skipping") return False # Ensure committee exists committee_query = """ MERGE (c:Committee {code: $code}) ON CREATE SET c.created_at = datetime() RETURN c.code as code """ neo4j.run_query(committee_query, {"code": meeting['committee_acronym']}) # Create meeting node meeting_data = { 'ourcommons_meeting_id': meeting['meeting_id'], 'committee_code': meeting['committee_acronym'], 'date': meeting['date'], 'time_description': meeting['time_str'], 'subject': meeting['subject'], 'status': meeting['status'], 'webcast_available': meeting['webcast'], 'source': 'ourcommons_filtered_meetings', 'imported_at': datetime.utcnow().isoformat(), } # Parse meeting number if available (will be null initially) meeting_data['number'] = None create_query = """ CREATE (m:Meeting) SET m = $meeting_data SET m.created_at = datetime() WITH m MATCH (c:Committee {code: $committee_code}) MERGE (c)-[:HELD_MEETING]->(m) RETURN m.ourcommons_meeting_id as id """ result = neo4j.run_query(create_query, { "meeting_data": meeting_data, "committee_code": meeting['committee_acronym'] }) logger.success(f"✓ Created Meeting {meeting['meeting_id']} for {meeting['committee_acronym']}") return True def check_and_import_recent_meetings(neo4j: Neo4jClient, lookback_days: int = 7): """Check for and import any missing committee meetings from the last N days.""" imported_count = 0 # Get dates to check (last N days, including weekends since committees can meet then) dates_to_check = [] for i in range(lookback_days, -1, -1): date = datetime.now() - timedelta(days=i) dates_to_check.append(date.strftime('%Y-%m-%d')) logger.info(f"Checking for committee meetings on {len(dates_to_check)} dates") logger.info(f"Date range: {dates_to_check[0]} to {dates_to_check[-1]}") # Fetch meetings for each date for date_str in dates_to_check: meetings = fetch_meetings_for_date(date_str) for meeting in meetings: try: if import_meeting_to_neo4j(neo4j, meeting): imported_count += 1 except Exception as e: logger.error(f"Failed to import meeting {meeting.get('meeting_id')}: {e}") return imported_count def main(): """Main entry point for daily committee import job.""" logger.info("=" * 80) logger.info("DAILY COMMITTEE MEETING IMPORT JOB") logger.info(f"Started at: {datetime.now().isoformat()}") logger.info("=" * 80) # Get Neo4j connection from environment neo4j_uri = os.getenv('NEO4J_URI', 'bolt://10.128.0.3:7687') neo4j_user = os.getenv('NEO4J_USERNAME', 'neo4j') neo4j_password = os.getenv('NEO4J_PASSWORD') if not neo4j_password: logger.error("NEO4J_PASSWORD environment variable not set") sys.exit(1) neo4j = Neo4jClient(uri=neo4j_uri, user=neo4j_user, password=neo4j_password) try: # Check for meetings from last 7 days imported = check_and_import_recent_meetings(neo4j, lookback_days=7) logger.info("=" * 80) if imported > 0: logger.success(f"✅ Successfully imported {imported} new committee meeting(s)") else: logger.info("ℹ️ No new committee meetings found") logger.info("=" * 80) except Exception as e: logger.error(f"Job failed with error: {str(e)}") import traceback traceback.print_exc() sys.exit(1) finally: neo4j.close() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/northernvariables/FedMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server