Skip to main content
Glama
northernvariables

FedMCP - Federal Parliamentary Information

hansard.py35.8 kB
"""Hansard statements and documents ingestion from OpenParliament PostgreSQL.""" from typing import Optional, Dict, Any from pathlib import Path from datetime import datetime import re import unicodedata from ..utils.neo4j_client import Neo4jClient from ..utils.postgres_client import PostgresClient from ..utils.progress import logger, ProgressTracker from ..utils.keyword_extraction import extract_document_keywords # Data Quality Utilities # ---------------------- def is_valid_date(date_value) -> bool: """ Check if a date is valid (not corrupted). Filters out dates with year > 3000 (likely data entry errors). Args: date_value: Date object or string Returns: True if date is valid, False otherwise """ if date_value is None: return False try: if isinstance(date_value, str): date_obj = datetime.fromisoformat(date_value) else: date_obj = date_value # Filter out corrupted dates (year 4043, etc.) return 1900 <= date_obj.year <= 3000 except (ValueError, AttributeError): return False def strip_html_tags(text: Optional[str]) -> Optional[str]: """ Remove HTML tags from text content. Preserves line breaks by converting <p> and <br> to newlines. Args: text: HTML text content Returns: Clean text without HTML tags """ if not text: return text # Convert paragraphs and breaks to newlines text = re.sub(r'<p[^>]*>', '\n\n', text) text = re.sub(r'</p>', '', text) text = re.sub(r'<br\s*/?\s*>', '\n', text, flags=re.IGNORECASE) # Remove all other HTML tags text = re.sub(r'<[^>]+>', '', text) # Decode common HTML entities text = text.replace('&amp;', '&') text = text.replace('&lt;', '<') text = text.replace('&gt;', '>') text = text.replace('&quot;', '"') text = text.replace('&#39;', "'") text = text.replace('&nbsp;', ' ') # Clean up excessive whitespace text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text) # Max 2 consecutive newlines text = re.sub(r' +', ' ', text) # Multiple spaces to single space return text.strip() def sanitize_statement_content(statement_data: Dict[str, Any]) -> Dict[str, Any]: """ Sanitize statement data for display. Removes HTML tags, validates dates, and cleans up content. Args: statement_data: Raw statement data dictionary Returns: Sanitized statement data dictionary """ # Strip HTML from content fields if 'content_en' in statement_data: statement_data['content_en'] = strip_html_tags(statement_data['content_en']) if 'content_fr' in statement_data: statement_data['content_fr'] = strip_html_tags(statement_data['content_fr']) # Strip HTML from header fields for field in ['h1_en', 'h2_en', 'h3_en', 'h1_fr', 'h2_fr', 'h3_fr']: if field in statement_data and statement_data[field]: statement_data[field] = strip_html_tags(statement_data[field]) # Validate and filter date if 'time' in statement_data: if not is_valid_date(statement_data['time']): logger.warning(f"Invalid date for statement {statement_data.get('id')}: {statement_data['time']}") statement_data['time'] = None return statement_data # Name Matching Utilities (for linking statements to MPs by speaker name) # ----------------------------------------------------------------------- # Common nickname mappings for Canadian MPs NICKNAME_MAPPING = { 'bobby': 'robert', 'rob': 'robert', 'bob': 'robert', 'bill': 'william', 'dick': 'richard', 'jim': 'james', 'joe': 'joseph', 'mike': 'michael', 'tony': 'anthony', 'shuv': 'shuvaloy', } def normalize_name(name: str) -> str: """ Normalize a name for fuzzy matching by: - Removing accents/diacritics (é → e, è → e) - Converting to lowercase - Removing extra whitespace - Removing punctuation like periods Args: name: Name to normalize Returns: Normalized name string """ if not name: return "" # Remove accents: é → e, è → e, ñ → n, etc. # NFD decomposes characters into base + combining characters # Then filter out combining characters name = ''.join( char for char in unicodedata.normalize('NFD', name) if unicodedata.category(char) != 'Mn' ) # Remove periods (for middle initials like "S." or "A.") name = name.replace('.', '') # Convert to lowercase and strip whitespace name = name.lower().strip() # Normalize whitespace name = ' '.join(name.split()) return name def extract_core_name(given_name: str, family_name: str) -> str: """ Extract core first and last name, removing middle names/initials. Args: given_name: Given/first name (may include middle names/initials) family_name: Family/last name Returns: "FirstName LastName" with middle names removed """ # Get first word from given name (removes middle names/initials) first_only = given_name.split()[0] if given_name else "" # Get first word from family name (handles hyphenated surnames) # e.g., "Fancy-Landry" → "Fancy" last_first = family_name.split()[0].split('-')[0] if family_name else "" return f"{first_only} {last_first}".strip() def ingest_hansard_documents( neo4j_client: Neo4jClient, postgres_client: PostgresClient, batch_size: int = 1000, limit: Optional[int] = None, ) -> int: """ Ingest Hansard documents from PostgreSQL to Neo4j. Documents group statements into debate sessions or committee evidence sessions. Args: neo4j_client: Neo4j client instance postgres_client: PostgreSQL client instance batch_size: Batch size for Neo4j operations limit: Optional limit for sample imports (None = all documents) Returns: Number of documents created """ logger.info("Ingesting Hansard documents from PostgreSQL...") # Build query query = """ SELECT id, date, number, session_id, document_type, source_id, downloaded, public, xml_source_url FROM hansards_document ORDER BY date DESC """ if limit: query += f" LIMIT {limit}" # Fetch documents documents = postgres_client.execute_query(query) logger.info(f"Fetched {len(documents):,} Hansard documents from PostgreSQL") if not documents: logger.warning("No Hansard documents found") return 0 # Prepare data for batch insert documents_data = [] for doc in documents: documents_data.append({ "id": doc["id"], "date": doc["date"].isoformat() if doc["date"] else None, "number": doc["number"], "session_id": doc["session_id"], "document_type": doc["document_type"], # D=debates, E=evidence "source_id": doc["source_id"], "downloaded": doc["downloaded"], "public": doc["public"], "xml_source_url": doc["xml_source_url"], }) # Create nodes in Neo4j tracker = ProgressTracker(total=len(documents_data), desc="Creating Document nodes") # Use UNWIND for efficient batch insert cypher = """ UNWIND $documents AS doc MERGE (d:Document {id: doc.id}) SET d.date = doc.date, d.number = doc.number, d.session_id = doc.session_id, d.document_type = doc.document_type, d.source_id = doc.source_id, d.downloaded = doc.downloaded, d.public = doc.public, d.xml_source_url = doc.xml_source_url, d.updated_at = datetime() RETURN count(d) as created """ # Process in batches created_total = 0 for i in range(0, len(documents_data), batch_size): batch = documents_data[i:i + batch_size] result = neo4j_client.run_query(cypher, {"documents": batch}) created = result[0]["created"] if result else 0 created_total += created tracker.update(len(batch)) tracker.close() logger.info(f"Created {created_total:,} Document nodes in Neo4j") return created_total def ingest_hansard_statements( neo4j_client: Neo4jClient, postgres_client: PostgresClient, batch_size: int = 5000, limit: Optional[int] = None, ) -> int: """ Ingest Hansard statements from PostgreSQL to Neo4j. Statements are individual speeches/interventions by MPs in debates or committees. Args: neo4j_client: Neo4j client instance postgres_client: PostgreSQL client instance batch_size: Batch size for Neo4j operations (larger for statements) limit: Optional limit for sample imports (None = all statements) Returns: Number of statements created """ logger.info("Ingesting Hansard statements from PostgreSQL...") # Build query - get most recent statements first query = """ SELECT id, document_id, time, politician_id, member_id, who_en, who_fr, content_en, content_fr, h1_en, h1_fr, h2_en, h2_fr, h3_en, h3_fr, statement_type, wordcount, procedural, bill_debated_id, bill_debate_stage, slug FROM hansards_statement ORDER BY time DESC """ if limit: query += f" LIMIT {limit}" # Fetch statements logger.info("Fetching statements from PostgreSQL...") statements = postgres_client.execute_query(query) logger.info(f"Fetched {len(statements):,} Hansard statements from PostgreSQL") if not statements: logger.warning("No Hansard statements found") return 0 # Prepare data for batch insert statements_data = [] for stmt in statements: # Build raw statement data statement_data = { "id": stmt["id"], "document_id": stmt["document_id"], "time": stmt["time"], # Don't convert yet, sanitize_statement_content handles it "politician_id": stmt["politician_id"], "member_id": stmt["member_id"], "who_en": stmt["who_en"], "who_fr": stmt["who_fr"], "content_en": stmt["content_en"] or "", "content_fr": stmt["content_fr"] or "", "h1_en": stmt["h1_en"], "h1_fr": stmt["h1_fr"], "h2_en": stmt["h2_en"], "h2_fr": stmt["h2_fr"], "h3_en": stmt["h3_en"], "h3_fr": stmt["h3_fr"], "statement_type": stmt["statement_type"], "wordcount": stmt["wordcount"], "procedural": stmt["procedural"], "bill_debated_id": stmt["bill_debated_id"], "bill_debate_stage": stmt["bill_debate_stage"], "slug": stmt["slug"], } # Sanitize content (strip HTML, validate dates) statement_data = sanitize_statement_content(statement_data) # Convert time to ISO format after sanitization if statement_data["time"]: statement_data["time"] = statement_data["time"].isoformat() statements_data.append(statement_data) # Create nodes in Neo4j tracker = ProgressTracker(total=len(statements_data), desc="Creating Statement nodes") # Use UNWIND for efficient batch insert cypher = """ UNWIND $statements AS stmt MERGE (s:Statement {id: stmt.id}) SET s.document_id = stmt.document_id, s.time = stmt.time, s.politician_id = stmt.politician_id, s.member_id = stmt.member_id, s.who_en = stmt.who_en, s.who_fr = stmt.who_fr, s.content_en = stmt.content_en, s.content_fr = stmt.content_fr, s.h1_en = stmt.h1_en, s.h1_fr = stmt.h1_fr, s.h2_en = stmt.h2_en, s.h2_fr = stmt.h2_fr, s.h3_en = stmt.h3_en, s.h3_fr = stmt.h3_fr, s.statement_type = stmt.statement_type, s.wordcount = stmt.wordcount, s.procedural = stmt.procedural, s.bill_debated_id = stmt.bill_debated_id, s.bill_debate_stage = stmt.bill_debate_stage, s.slug = stmt.slug, s.updated_at = datetime() RETURN count(s) as created """ # Process in batches created_total = 0 for i in range(0, len(statements_data), batch_size): batch = statements_data[i:i + batch_size] result = neo4j_client.run_query(cypher, {"statements": batch}) created = result[0]["created"] if result else 0 created_total += created tracker.update(len(batch)) tracker.close() logger.info(f"Created {created_total:,} Statement nodes in Neo4j") return created_total def link_statements_to_mps( neo4j_client: Neo4jClient, batch_size: int = 10000, ) -> int: """ Create MADE_BY relationships between Statements and MPs. Links statements to the MPs who made them using politician_id. Matches Statement.politician_id (OpenParliament ID) to MP.openparliament_politician_id. Args: neo4j_client: Neo4j client instance batch_size: Batch size for relationship creation Returns: Number of relationships created """ logger.info("Linking statements to MPs...") cypher = """ MATCH (s:Statement) WHERE s.politician_id IS NOT NULL AND NOT exists((s)-[:MADE_BY]->()) WITH s LIMIT $batch_size MATCH (mp:MP) WHERE mp.openparliament_politician_id = s.politician_id MERGE (s)-[:MADE_BY]->(mp) RETURN count(*) as created """ total_created = 0 while True: result = neo4j_client.run_query(cypher, {"batch_size": batch_size}) created = result[0]["created"] if result else 0 if created == 0: break total_created += created logger.info(f"Progress: {total_created:,} MADE_BY relationships created") logger.info(f"Created {total_created:,} MADE_BY relationships") return total_created def link_statements_to_mps_by_name( neo4j_client: Neo4jClient, document_id: Optional[int] = None, ) -> int: """ Create MADE_BY relationships between Statements and MPs using speaker name matching. This function is used when politician_id is not available (e.g., XML imports). Uses sophisticated name normalization to handle: - French accents (é → e, è → e) - Hyphenated surnames (Fancy-Landry) - Compound surnames (Rempel Garner) - Nicknames (Bobby ↔ Robert) - Honorifics (Hon., Rt. Hon., Dr., etc.) - Middle names/initials Args: neo4j_client: Neo4j client instance document_id: Optional document ID to limit linking to specific document Returns: Number of relationships created """ logger.info("Linking statements to MPs by speaker name...") # Build MP name -> ID mapping from database logger.info("Building MP name mapping with all variations...") mp_mapping = {} mp_query_result = neo4j_client.run_query(""" MATCH (m:MP) RETURN m.id AS id, m.name AS name, m.given_name AS given_name, m.family_name AS family_name """) for record in mp_query_result: mp_id = record.get("id") name = record.get("name") given = record.get("given_name", "") family = record.get("family_name", "") # Store by full name (normalized) if name: mp_mapping[normalize_name(name)] = mp_id # Store by "FirstName LastName" format (normalized) if given and family: normalized_full = normalize_name(f"{given} {family}") mp_mapping[normalized_full] = mp_id # Also store core name without middle names/initials # e.g., "Amanpreet S. Gill" -> "amanpreet gill" core_name = normalize_name(extract_core_name(given, family)) if core_name: mp_mapping[core_name] = mp_id # Store with nickname variations first_name = given.split()[0] if given else "" first_normalized = normalize_name(first_name) if first_normalized in NICKNAME_MAPPING: # e.g., "Bobby Morrissey" also maps as "Robert Morrissey" formal_name = normalize_name(f"{NICKNAME_MAPPING[first_normalized]} {family}") mp_mapping[formal_name] = mp_id # Also store core version with formal name formal_core = normalize_name(f"{NICKNAME_MAPPING[first_normalized]} {family.split()[0].split('-')[0]}") mp_mapping[formal_core] = mp_id # For compound last names, also store with just the first part # e.g., "Michelle Rempel Garner" -> "Michelle Rempel" if given and family and " " in family: first_family = family.split()[0] mp_mapping[normalize_name(f"{given} {first_family}")] = mp_id # For hyphenated last names, also store first part only # e.g., "Jessica Fancy-Landry" -> "Jessica Fancy" if given and family and "-" in family: first_part = family.split('-')[0] mp_mapping[normalize_name(f"{given} {first_part}")] = mp_id # Also core version mp_mapping[normalize_name(f"{given.split()[0]} {first_part}")] = mp_id logger.info(f"Built {len(mp_mapping):,} MP name variations") # Get statements that need linking if document_id: statements_query = """ MATCH (s:Statement)-[:PART_OF]->(d:Document {id: $document_id}) WHERE s.who_en IS NOT NULL AND NOT exists((s)-[:MADE_BY]->()) AND s.who_en <> 'The Speaker' AND s.who_en <> 'The Deputy Speaker' AND s.who_en <> 'The Chair' AND s.who_en <> 'The Acting Speaker' RETURN s.id as statement_id, s.who_en as speaker_name """ statements = neo4j_client.run_query(statements_query, {"document_id": document_id}) else: statements_query = """ MATCH (s:Statement) WHERE s.who_en IS NOT NULL AND NOT exists((s)-[:MADE_BY]->()) AND s.who_en <> 'The Speaker' AND s.who_en <> 'The Deputy Speaker' AND s.who_en <> 'The Chair' AND s.who_en <> 'The Acting Speaker' RETURN s.id as statement_id, s.who_en as speaker_name LIMIT 10000 """ statements = neo4j_client.run_query(statements_query) logger.info(f"Found {len(statements):,} statements to link") if not statements: return 0 # Match statements to MPs matched_pairs = [] unmatched = [] for stmt in statements: statement_id = stmt["statement_id"] speaker_name = stmt["speaker_name"] if not speaker_name: continue # Strip honorifics from speaker name clean_name = speaker_name honorifics = ["Right Hon.", "Rt. Hon.", "Hon.", "Dr.", "Rev.", "Prof.", "Mr.", "Mrs.", "Ms.", "Miss"] for honorific in honorifics: clean_name = clean_name.replace(honorific, "").strip() # Normalize the name normalized_name = normalize_name(clean_name) # Look up MP ID mp_id = mp_mapping.get(normalized_name) # If not found, try variations if not mp_id and " " in normalized_name: parts = normalized_name.split() # Try: first name + first word of last name if len(parts) >= 2: first_last = f"{parts[0]} {parts[1]}" mp_id = mp_mapping.get(first_last) # Try: extracting just first + last (no middle names) if not mp_id and len(parts) >= 2: core_name = f"{parts[0]} {parts[-1]}" mp_id = mp_mapping.get(core_name) if mp_id: matched_pairs.append({ "statement_id": statement_id, "mp_id": mp_id, }) else: unmatched.append(speaker_name) logger.info(f"Matched {len(matched_pairs):,} statements to MPs") if unmatched: logger.warning(f"Could not match {len(unmatched)} speaker names:") for name in list(set(unmatched))[:10]: # Show first 10 unique unmatched names logger.warning(f" - {name}") # Create relationships in batches if not matched_pairs: return 0 cypher = """ UNWIND $pairs AS pair MATCH (s:Statement {id: pair.statement_id}) MATCH (mp:MP {id: pair.mp_id}) MERGE (s)-[:MADE_BY]->(mp) RETURN count(*) as created """ result = neo4j_client.run_query(cypher, {"pairs": matched_pairs}) created = result[0]["created"] if result else 0 logger.success(f"✅ Created {created:,} MADE_BY relationships by name matching") return created def link_statements_to_documents( neo4j_client: Neo4jClient, batch_size: int = 10000, ) -> int: """ Create PART_OF relationships between Statements and Documents. Args: neo4j_client: Neo4j client instance batch_size: Batch size for relationship creation Returns: Number of relationships created """ logger.info("Linking statements to documents...") cypher = """ MATCH (s:Statement) WHERE s.document_id IS NOT NULL AND NOT exists((s)-[:PART_OF]->()) WITH s LIMIT $batch_size MATCH (d:Document {id: s.document_id}) MERGE (s)-[:PART_OF]->(d) RETURN count(*) as created """ total_created = 0 while True: result = neo4j_client.run_query(cypher, {"batch_size": batch_size}) created = result[0]["created"] if result else 0 if created == 0: break total_created += created logger.info(f"Progress: {total_created:,} PART_OF relationships created") logger.info(f"Created {total_created:,} PART_OF relationships") return total_created def link_statements_to_bills( neo4j_client: Neo4jClient, batch_size: int = 10000, ) -> int: """ Create MENTIONS relationships between Statements and Bills. Links statements to bills they debated or mentioned. Matches Statement.bill_debated_id (OpenParliament ID) to Bill.openparliament_bill_id. Args: neo4j_client: Neo4j client instance batch_size: Batch size for relationship creation Returns: Number of relationships created """ logger.info("Linking statements to bills...") cypher = """ MATCH (s:Statement) WHERE s.bill_debated_id IS NOT NULL AND NOT exists((s)-[:MENTIONS]->()) WITH s LIMIT $batch_size MATCH (b:Bill) WHERE b.openparliament_bill_id = s.bill_debated_id MERGE (s)-[r:MENTIONS]->(b) SET r.debate_stage = s.bill_debate_stage RETURN count(*) as created """ total_created = 0 while True: result = neo4j_client.run_query(cypher, {"batch_size": batch_size}) created = result[0]["created"] if result else 0 if created == 0: break total_created += created logger.info(f"Progress: {total_created:,} MENTIONS relationships created") logger.info(f"Created {total_created:,} MENTIONS relationships") return total_created def create_hansard_indexes(neo4j_client: Neo4jClient) -> None: """ Create indexes for efficient Hansard queries. Creates: - Full-text indexes on Statement.content_en and Statement.content_fr - Regular indexes on Statement(document_id, time) - Index on Document.date Args: neo4j_client: Neo4j client instance """ logger.info("Creating Hansard indexes...") # Full-text index on English content try: neo4j_client.run_query(""" CREATE FULLTEXT INDEX statement_content_en IF NOT EXISTS FOR (s:Statement) ON EACH [s.content_en] """) logger.info("Created full-text index on Statement.content_en") except Exception as e: logger.warning(f"Could not create statement_content_en index: {e}") # Full-text index on French content try: neo4j_client.run_query(""" CREATE FULLTEXT INDEX statement_content_fr IF NOT EXISTS FOR (s:Statement) ON EACH [s.content_fr] """) logger.info("Created full-text index on Statement.content_fr") except Exception as e: logger.warning(f"Could not create statement_content_fr index: {e}") # Composite index for statement lookups by document and time try: neo4j_client.run_query(""" CREATE INDEX statement_document_time IF NOT EXISTS FOR (s:Statement) ON (s.document_id, s.time) """) logger.info("Created composite index on Statement(document_id, time)") except Exception as e: logger.warning(f"Could not create statement_document_time index: {e}") # Index on document date try: neo4j_client.run_query(""" CREATE INDEX document_date IF NOT EXISTS FOR (d:Document) ON (d.date) """) logger.info("Created index on Document.date") except Exception as e: logger.warning(f"Could not create document_date index: {e}") logger.info("Hansard indexes created successfully") def extract_hansard_keywords( neo4j_client: Neo4jClient, session_id: Optional[str] = None, limit: Optional[int] = None, top_n: int = 20, skip_existing: bool = True ) -> int: """ Extract and populate keywords for Hansard documents using TF-IDF. Processes documents by session to build proper corpus for keyword weighting. Args: neo4j_client: Neo4j client instance session_id: Optional specific session to process (e.g., "45-1") limit: Optional limit for number of documents to process top_n: Number of keywords to extract per document skip_existing: If True, skip documents that already have keywords (default: True) Returns: Number of documents updated with keywords """ logger.info("Extracting keywords for Hansard documents...") # Get sessions to process if session_id: sessions_query = """ MATCH (d:Document {session_id: $session_id}) WHERE d.public = true RETURN DISTINCT d.session_id as session_id """ sessions = neo4j_client.run_query(sessions_query, {"session_id": session_id}) else: sessions_query = """ MATCH (d:Document) WHERE d.public = true AND d.session_id IS NOT NULL RETURN DISTINCT d.session_id as session_id ORDER BY session_id DESC """ sessions = neo4j_client.run_query(sessions_query) total_updated = 0 for session_row in sessions: session = session_row['session_id'] logger.info(f"Processing session {session}...") # Get all documents in session docs_query = """ MATCH (d:Document {session_id: $session_id}) WHERE d.public = true RETURN d.id as doc_id ORDER BY d.date DESC """ if limit and not session_id: docs_query += f" LIMIT {limit}" documents = neo4j_client.run_query(docs_query, {"session_id": session}) if not documents: continue doc_ids = [doc['doc_id'] for doc in documents] # Identify which documents need keyword extraction if skip_existing: needs_keywords_query = """ MATCH (d:Document {session_id: $session_id}) WHERE d.public = true AND (d.keywords_en IS NULL OR d.keywords_en = []) RETURN d.id as doc_id """ needs_keywords_result = neo4j_client.run_query(needs_keywords_query, {"session_id": session}) docs_needing_keywords = {doc['doc_id'] for doc in needs_keywords_result} if not docs_needing_keywords: logger.info(f" All {len(doc_ids)} documents already have keywords, skipping session") continue logger.info(f" {len(docs_needing_keywords)} of {len(doc_ids)} documents need keywords") else: docs_needing_keywords = set(doc_ids) logger.info(f" Processing {len(doc_ids)} documents in session {session}") # Get all statement text for corpus (use ALL docs for TF-IDF quality) corpus_query = """ MATCH (d:Document)<-[:PART_OF]-(s:Statement) WHERE d.id IN $doc_ids AND s.procedural = false RETURN d.id as doc_id, collect(COALESCE(s.content_en, '')) as contents_en, collect(COALESCE(s.content_fr, '')) as contents_fr """ result = neo4j_client.run_query(corpus_query, {"doc_ids": doc_ids}) # Build document texts and corpus doc_texts = {} corpus_en = [] corpus_fr = [] for row in result: text_en = ' '.join([c for c in row['contents_en'] if c]) text_fr = ' '.join([c for c in row['contents_fr'] if c]) doc_texts[row['doc_id']] = { 'text_en': text_en, 'text_fr': text_fr } if text_en: corpus_en.append({'text': text_en}) if text_fr: corpus_fr.append({'text': text_fr}) logger.info(f" Corpus: {len(corpus_en)} EN docs, {len(corpus_fr)} FR docs") # Filter to only documents that need keywords docs_to_process = {doc_id: texts for doc_id, texts in doc_texts.items() if doc_id in docs_needing_keywords} if not docs_to_process: logger.info(f" No documents need keyword extraction") continue # Extract keywords for documents that need them tracker = ProgressTracker( total=len(docs_to_process), desc=f"Extracting keywords ({session})" ) for doc_id, texts in docs_to_process.items(): # Extract keywords (using full corpus for TF-IDF quality) keywords_en, keywords_fr = extract_document_keywords( texts['text_en'], texts['text_fr'], corpus_en, corpus_fr, top_n=top_n ) # Update document neo4j_client.run_query(""" MATCH (d:Document {id: $doc_id}) SET d.keywords_en = $keywords_en, d.keywords_fr = $keywords_fr, d.updated_at = datetime() """, { "doc_id": doc_id, "keywords_en": keywords_en, "keywords_fr": keywords_fr }) total_updated += 1 tracker.update(1) tracker.close() logger.success(f"✅ Extracted keywords for {total_updated} documents") return total_updated def ingest_hansard_sample( neo4j_client: Neo4jClient, postgres_client: PostgresClient, statement_limit: int = 1000, ) -> Dict[str, int]: """ Import a sample of Hansard data for testing. Args: neo4j_client: Neo4j client instance postgres_client: PostgreSQL client instance statement_limit: Number of recent statements to import Returns: Dictionary with counts of created nodes and relationships """ logger.info(f"=" * 80) logger.info(f"HANSARD SAMPLE IMPORT ({statement_limit:,} statements)") logger.info(f"=" * 80) results = {} # Get unique document IDs from sample statements # First get the most recent statements, then extract unique document IDs query = f""" SELECT document_id, time FROM hansards_statement ORDER BY time DESC LIMIT {statement_limit} """ statements = postgres_client.execute_query(query) # Extract unique document IDs document_ids = list(set(row["document_id"] for row in statements)) logger.info(f"Sample covers {len(document_ids)} documents") # Import documents for these statements doc_query = f""" SELECT id, date, number, session_id, document_type, source_id, downloaded, public, xml_source_url FROM hansards_document WHERE id IN ({','.join(map(str, document_ids))}) """ documents = postgres_client.execute_query(doc_query) # Create documents results["documents"] = ingest_hansard_documents( neo4j_client, postgres_client, limit=None, # We've already filtered ) # Import statements results["statements"] = ingest_hansard_statements( neo4j_client, postgres_client, limit=statement_limit, ) # Create relationships results["made_by_links"] = link_statements_to_mps(neo4j_client) results["part_of_links"] = link_statements_to_documents(neo4j_client) results["mentions_links"] = link_statements_to_bills(neo4j_client) # Create indexes create_hansard_indexes(neo4j_client) logger.info(f"=" * 80) logger.info(f"HANSARD SAMPLE IMPORT COMPLETE") logger.info(f"Documents: {results['documents']:,}") logger.info(f"Statements: {results['statements']:,}") logger.info(f"MP links: {results['made_by_links']:,}") logger.info(f"Document links: {results['part_of_links']:,}") logger.info(f"Bill mentions: {results['mentions_links']:,}") logger.info(f"=" * 80) return results def ingest_hansard_full( neo4j_client: Neo4jClient, postgres_client: PostgresClient, ) -> Dict[str, int]: """ Import ALL Hansard data from PostgreSQL to Neo4j. This is a long-running operation (2-3 hours for 3.67M statements). Args: neo4j_client: Neo4j client instance postgres_client: PostgreSQL client instance Returns: Dictionary with counts of created nodes and relationships """ logger.info(f"=" * 80) logger.info(f"HANSARD FULL IMPORT (ALL DATA)") logger.info(f"WARNING: This will take 2-3 hours for 3.67M statements") logger.info(f"=" * 80) results = {} # Import all documents results["documents"] = ingest_hansard_documents( neo4j_client, postgres_client, limit=None, ) # Import all statements results["statements"] = ingest_hansard_statements( neo4j_client, postgres_client, limit=None, ) # Create relationships results["made_by_links"] = link_statements_to_mps(neo4j_client) results["part_of_links"] = link_statements_to_documents(neo4j_client) results["mentions_links"] = link_statements_to_bills(neo4j_client) # Create indexes (will take 30-60 minutes for full-text indexes) create_hansard_indexes(neo4j_client) logger.info(f"=" * 80) logger.info(f"HANSARD FULL IMPORT COMPLETE") logger.info(f"Documents: {results['documents']:,}") logger.info(f"Statements: {results['statements']:,}") logger.info(f"MP links: {results['made_by_links']:,}") logger.info(f"Document links: {results['part_of_links']:,}") logger.info(f"Bill mentions: {results['mentions_links']:,}") logger.info(f"=" * 80) return results

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/northernvariables/FedMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server