Skip to main content
Glama
northernvariables

FedMCP - Federal Parliamentary Information

daily-hansard-import.py20.7 kB
#!/usr/bin/env python3 """Daily Hansard import job - checks for new debates and imports them with enhanced metadata.""" import sys import os from pathlib import Path from datetime import datetime, timedelta from typing import Dict, Any, List, Optional import json import requests # Add packages to path sys.path.insert(0, str(Path(__file__).parent.parent / 'packages' / 'data-pipeline')) sys.path.insert(0, str(Path(__file__).parent.parent / 'packages' / 'fedmcp' / 'src')) from fedmcp_pipeline.utils.neo4j_client import Neo4jClient from fedmcp_pipeline.utils.progress import logger from fedmcp.clients.ourcommons import OurCommonsHansardClient from fedmcp_pipeline.ingest.hansard import link_statements_to_mps_by_name, extract_hansard_keywords def parse_hansard_with_enhanced_metadata(xml_text: str, source_url: str) -> Dict[str, Any]: """Parse Hansard XML with enhanced metadata and proper topic hierarchy extraction.""" from xml.etree import ElementTree as ET client = OurCommonsHansardClient() sitting = client.parse_sitting(xml_text, source_url=source_url) logger.info(f"Parsing Hansard No. {sitting.number}, Date: {sitting.date}") # Parse XML to extract proper topic hierarchy (OrderOfBusiness → SubjectOfBusiness) # This gives us semantic topics like "The Budget", "Bill C-21" instead of just "Debate" tree = ET.fromstring(xml_text) # Build intervention_id to topic mapping intervention_topics = {} # intervention_id -> (h1_en, h2_en) # Navigate XML hierarchy: OrderOfBusiness → SubjectOfBusiness → Interventions for order in tree.findall(".//OrderOfBusiness"): # Get h1_en: OrderOfBusinessTitle (e.g., "Government Orders", "Oral Questions") order_title_el = order.find("OrderOfBusinessTitle") h1_en = "".join(order_title_el.itertext()).strip() if order_title_el is not None else "Hansard" # Get all SubjectOfBusiness under this order subjects = order.findall(".//SubjectOfBusiness") for subject in subjects: # Get h2_en: SubjectOfBusinessTitle (e.g., "The Budget", "Bill C-21", "Agriculture") subject_title_el = subject.find("SubjectOfBusinessTitle") h2_en = "".join(subject_title_el.itertext()).strip() if subject_title_el is not None else None # Get all interventions under this subject content = subject.find("SubjectOfBusinessContent") if content is not None: for intervention in content.findall(".//Intervention"): intervention_id = intervention.get("id") if intervention_id: intervention_topics[intervention_id] = (h1_en, h2_en) logger.info(f"Extracted {len(intervention_topics)} intervention topic mappings from XML hierarchy") # Extract speeches with enhanced metadata from all sections speeches = [] for section in sitting.sections: for speech in section.speeches: # Get topics from our mapping, fallback to section title if speech.intervention_id and speech.intervention_id in intervention_topics: h1_en, h2_en = intervention_topics[speech.intervention_id] else: # Fallback if intervention not found in hierarchy h1_en = section.title h2_en = speech.intervention_type or section.title speeches.append({ # Basic fields "speaker_name": speech.speaker_name, "timecode": speech.timecode, "text": speech.text, "h1_en": h1_en, "h2_en": h2_en, # Now contains semantic topic titles from SubjectOfBusinessTitle! # Enhanced metadata fields "person_db_id": speech.person_db_id, "role_type_code": speech.role_type_code, "intervention_id": speech.intervention_id, "paragraph_ids": speech.paragraph_ids, "timestamp_hour": speech.timestamp_hour, "timestamp_minute": speech.timestamp_minute, "floor_language": speech.floor_language, "intervention_type": speech.intervention_type, "party": speech.party, "riding": speech.riding, }) logger.info(f"Extracted {len(speeches)} speeches with enhanced metadata and semantic topics") return { "number": sitting.number, "date": sitting.date, "speeches": speeches, # Enhanced document metadata "creation_timestamp": sitting.creation_timestamp, "speaker_of_day": sitting.speaker_of_day, "hansard_document_id": sitting.hansard_document_id, "parliament_number": sitting.parliament_number, "session_number": sitting.session_number, "volume": sitting.volume, } def import_hansard_to_neo4j(neo4j: Neo4jClient, hansard_data: Dict[str, Any], iso_date: str, document_id: int, sitting_number: str): """Import parsed Hansard to Neo4j with enhanced metadata.""" logger.info(f"Importing Hansard to Neo4j as Document {document_id}...") # Delete existing document if it exists neo4j.run_query("MATCH (d:Document {id: $doc_id}) DETACH DELETE d", {"doc_id": document_id}) # Create Document node with enhanced metadata document_data = { "id": document_id, "date": iso_date, "document_type": "D", "public": True, "source": "ourcommons_xml_enhanced", "number": int(sitting_number), "updated_at": datetime.now().isoformat(), } # Add enhanced document metadata if hansard_data.get("creation_timestamp"): document_data["creation_timestamp"] = hansard_data["creation_timestamp"] if hansard_data.get("speaker_of_day"): document_data["speaker_of_day"] = hansard_data["speaker_of_day"] if hansard_data.get("hansard_document_id"): document_data["hansard_document_id"] = hansard_data["hansard_document_id"] if hansard_data.get("parliament_number") is not None: document_data["parliament_number"] = hansard_data["parliament_number"] if hansard_data.get("session_number") is not None: document_data["session_number"] = hansard_data["session_number"] if hansard_data.get("parliament_number") and hansard_data.get("session_number"): # Dynamically updates the session_id document_data["session_id"] = f"{hansard_data['parliament_number']}-{hansard_data['session_number']}" if hansard_data.get("volume"): document_data["volume"] = hansard_data["volume"] cypher = """ CREATE (d:Document) SET d = $doc SET d.updated_at = datetime() RETURN d.id as created_id """ result = neo4j.run_query(cypher, {"doc": document_data}) logger.success(f"✓ Created Document node: {result[0]['created_id']}") # Create Statement nodes with enhanced metadata statements_data = [] mp_link_data = [] # For person_db_id-based MP linking for idx, speech in enumerate(hansard_data["speeches"], start=1): # Use intervention_id if available, otherwise generate statement_id = speech.get("intervention_id") or f"{document_id}-{idx}" wordcount = len(speech["text"].split()) if speech["text"] else 0 # Format time as ISO-8601 DateTime (Neo4j requires seconds) time_value = None if speech.get("timecode"): timecode = speech["timecode"] # Ensure timecode has seconds (HH:MM -> HH:MM:00) if len(timecode) == 5 and timecode.count(":") == 1: timecode = f"{timecode}:00" time_value = f"{iso_date}T{timecode}" statement = { "id": statement_id, "document_id": document_id, "time": time_value, "who_en": speech.get("speaker_name") or "", "content_en": (speech.get("text") or "")[:10000], # Limit content size "h1_en": speech.get("h1_en"), "h2_en": speech.get("h2_en"), "statement_type": "speech", "wordcount": wordcount, "procedural": False, } # Add enhanced metadata fields if speech.get("person_db_id") is not None: statement["person_db_id"] = speech["person_db_id"] # Set politician_id for speaker_count queries (same as person_db_id) statement["politician_id"] = speech["person_db_id"] mp_link_data.append({ "statement_id": statement_id, "person_db_id": speech["person_db_id"] }) if speech.get("role_type_code") is not None: statement["role_type_code"] = speech["role_type_code"] if speech.get("intervention_id"): statement["intervention_id"] = speech["intervention_id"] if speech.get("paragraph_ids"): statement["paragraph_ids"] = json.dumps(speech["paragraph_ids"]) if speech.get("timestamp_hour") is not None: statement["timestamp_hour"] = speech["timestamp_hour"] if speech.get("timestamp_minute") is not None: statement["timestamp_minute"] = speech["timestamp_minute"] if speech.get("floor_language"): statement["floor_language"] = speech["floor_language"] if speech.get("intervention_type"): statement["intervention_type"] = speech["intervention_type"] if speech.get("party"): statement["party"] = speech["party"] if speech.get("riding"): statement["riding"] = speech["riding"] statements_data.append(statement) cypher = """ UNWIND $statements AS stmt CREATE (s:Statement) SET s = stmt SET s.updated_at = datetime() RETURN count(s) as created_count """ result = neo4j.run_query(cypher, {"statements": statements_data}) logger.success(f"✓ Created {result[0]['created_count']} Statement nodes with enhanced metadata") # Link statements to document cypher = """ MATCH (s:Statement {document_id: $doc_id}) MATCH (d:Document {id: $doc_id}) MERGE (s)-[:PART_OF]->(d) RETURN count(*) as linked """ result = neo4j.run_query(cypher, {"doc_id": document_id}) logger.success(f"✓ Created {result[0]['linked']} PART_OF relationships") # Link statements to MPs using hansard_db_id (exact matching) with name matching fallback linked_by_dbid = 0 linked_by_name = 0 if mp_link_data: # Try exact DbId matching FIRST link_query = """ UNWIND $links AS link MATCH (s:Statement {id: link.statement_id}) MATCH (mp:MP {hansard_db_id: link.person_db_id}) MERGE (s)-[:MADE_BY]->(mp) RETURN count(*) as linked_count """ result = neo4j.run_query(link_query, {"links": mp_link_data}) linked_by_dbid = result[0]['linked_count'] if result else 0 logger.success(f"✓ Linked {linked_by_dbid} statements to MPs using hansard_db_id (exact matching)") # Fall back to name matching for unlinked statements total_statements = len(hansard_data["speeches"]) if linked_by_dbid < total_statements: logger.info(f"Attempting name matching for {total_statements - linked_by_dbid} unlinked statements...") linked_by_name = link_statements_to_mps_by_name(neo4j, document_id) logger.success(f"✓ Linked {linked_by_name} statements to MPs using name matching (fallback)") else: logger.warning("⚠️ No person_db_id data available, using name matching only") linked_by_name = link_statements_to_mps_by_name(neo4j, document_id) logger.success(f"✓ Linked {linked_by_name} statements to MPs using name matching") # Create SPOKE_AT relationships spoke_at_query = """ MATCH (s:Statement)-[:MADE_BY]->(mp:MP), (s)-[:PART_OF]->(d:Document {id: $doc_id}) MERGE (mp)-[r:SPOKE_AT]->(d) SET r.statement_id = s.id, r.person_db_id = s.person_db_id RETURN count(DISTINCT r) as spoke_at_count """ result = neo4j.run_query(spoke_at_query, {"doc_id": document_id}) spoke_at_count = result[0]['spoke_at_count'] if result else 0 logger.success(f"✓ Created {spoke_at_count} SPOKE_AT relationships") total_linked = linked_by_dbid + linked_by_name return len(hansard_data["speeches"]), total_linked def get_latest_document_id(neo4j: Neo4jClient) -> int: """Get the highest document ID currently in the database.""" result = neo4j.run_query(""" MATCH (d:Document) RETURN max(d.id) as max_id """) return result[0]['max_id'] if result and result[0]['max_id'] else 25000 def check_and_import_recent_debates(neo4j: Neo4jClient, lookback_days: int = 7, target_month: str = None): """Check for and import any missing debates from the last N days or a specific month. Args: neo4j: Neo4j client lookback_days: Number of days to look back (default: 7) target_month: Optional month in YYYY-MM format (e.g., '2025-11') to check all weekdays in that month """ imported_count = 0 # Get dates to check dates_to_check = [] if target_month: # Generate all weekdays in the specified month year, month = map(int, target_month.split('-')) from calendar import monthrange days_in_month = monthrange(year, month)[1] for day in range(1, days_in_month + 1): date = datetime(year, month, day) # Skip weekends (House doesn't sit on weekends) if date.weekday() < 5: # Monday = 0, Friday = 4 dates_to_check.append(date.strftime('%Y-%m-%d')) logger.info(f"Checking for debates in {target_month}: {len(dates_to_check)} weekdays") else: # Original logic: last N days for i in range(lookback_days, -1, -1): date = datetime.now() - timedelta(days=i) # Skip weekends (House doesn't sit on weekends) if date.weekday() < 5: # Monday = 0, Friday = 4 dates_to_check.append(date.strftime('%Y-%m-%d')) logger.info(f"Checking for debates on dates: {dates_to_check}") # Check which dates already exist result = neo4j.run_query(""" MATCH (d:Document) WHERE d.date IN $dates RETURN d.date as date """, {"dates": dates_to_check}) existing_dates = {row['date'] for row in result} missing_dates = [d for d in dates_to_check if d not in existing_dates] logger.info(f"Already imported: {existing_dates}") logger.info(f"Missing dates to check: {missing_dates}") # Get the latest sitting number we have to estimate range result = neo4j.run_query(""" MATCH (d:Document) WHERE d.number IS NOT NULL WITH d.number as num ORDER BY d.date DESC LIMIT 1 RETURN num """) if result and result[0]['num']: # Handle both old format ("No. 069") and new format (69) num_val = result[0]['num'] if isinstance(num_val, str): latest_sitting = int(num_val.replace('No. ', '').strip()) else: latest_sitting = int(num_val) # Search backward and forward: from (latest - 15) to (latest + 15) search_range = range(max(1, latest_sitting - 15), latest_sitting + 16) else: # Fallback: search from sitting 040 to 070 search_range = range(40, 71) logger.info(f"Searching sitting numbers: {search_range.start} to {search_range.stop - 1}") # Try each sitting number in range for sitting_num in search_range: sitting_str = str(sitting_num).zfill(3) xml_url = f"https://www.ourcommons.ca/Content/House/451/Debates/{sitting_str}/HAN{sitting_str}-E.XML" try: # Check if XML exists response = requests.head(xml_url, timeout=10) if response.status_code != 200: continue # Fetch and parse to get actual date response = requests.get(xml_url, headers={"Accept": "application/xml"}) response.raise_for_status() xml_text = response.content.decode('utf-8-sig') hansard_data = parse_hansard_with_enhanced_metadata(xml_text, source_url=xml_url) # Parse the verbose date from XML to ISO format try: # Parse date like "Monday, November 17, 2025" to ISO "2025-11-17" parsed_date = datetime.strptime(hansard_data['date'], '%A, %B %d, %Y') iso_date = parsed_date.strftime('%Y-%m-%d') # Skip if not in our target date range if iso_date not in missing_dates: logger.info(f"⏭ Sitting {sitting_str} ({iso_date}) not in missing dates, skipping") continue # Skip if already imported (by date check) if iso_date in existing_dates: logger.info(f"⏭ Sitting {sitting_str} ({iso_date}) already imported, skipping") continue logger.success(f"✓ Found sitting {sitting_str} for missing date {iso_date}") # Get next document ID latest_doc_id = get_latest_document_id(neo4j) document_id = latest_doc_id + 1 # Import stmt_count, linked_count = import_hansard_to_neo4j( neo4j, hansard_data, iso_date, document_id, sitting_str ) logger.success(f"✅ Imported sitting {sitting_str} ({iso_date}): {stmt_count} statements, {linked_count} linked") imported_count += 1 # Mark as imported existing_dates.add(iso_date) except (ValueError, TypeError) as e: logger.warning(f"Could not parse date '{hansard_data['date']}': {e}") except requests.exceptions.RequestException as e: if hasattr(e.response, 'status_code') and e.response.status_code == 404: # 404 is expected for sitting numbers that don't exist continue logger.error(f"Error fetching {xml_url}: {e}") return imported_count def main(): """Main entry point for daily import job.""" import argparse parser = argparse.ArgumentParser(description='Daily Hansard import job') parser.add_argument('--lookback-days', type=int, default=30, help='Number of days to look back (default: 30)') parser.add_argument('--month', type=str, default=None, help='Target month in YYYY-MM format (e.g., 2025-11) to check all weekdays') args = parser.parse_args() logger.info("=" * 80) logger.info("DAILY HANSARD IMPORT JOB") logger.info(f"Started at: {datetime.now().isoformat()}") logger.info("=" * 80) # Get Neo4j connection from environment neo4j_uri = os.getenv('NEO4J_URI', 'bolt://10.128.0.3:7687') neo4j_user = os.getenv('NEO4J_USERNAME', 'neo4j') neo4j_password = os.getenv('NEO4J_PASSWORD') if not neo4j_password: logger.error("NEO4J_PASSWORD environment variable not set") sys.exit(1) neo4j = Neo4jClient(uri=neo4j_uri, user=neo4j_user, password=neo4j_password) try: # Check for debates from specified period imported = check_and_import_recent_debates( neo4j, lookback_days=args.lookback_days, target_month=args.month ) # Extract keywords for newly imported documents if imported > 0: logger.info("") logger.info("Extracting keywords for newly imported documents...") try: keywords_count = extract_hansard_keywords( neo4j_client=neo4j, top_n=20 # Extract top 20 keywords per document ) logger.success(f"✅ Extracted keywords for {keywords_count} document(s)") except Exception as e: logger.warning(f"⚠️ Keyword extraction failed: {str(e)}") # Don't fail the whole job if keyword extraction fails pass logger.info("=" * 80) if imported > 0: logger.success(f"✅ Successfully imported {imported} new debate(s)") else: logger.info("ℹ️ No new debates found") logger.info("=" * 80) except Exception as e: logger.error(f"Job failed with error: {str(e)}") import traceback traceback.print_exc() sys.exit(1) finally: neo4j.close() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/northernvariables/FedMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server