Skip to main content
Glama
northernvariables

FedMCP - Federal Parliamentary Information

parliament.py38.4 kB
"""Parliamentary data ingestion: MPs, bills, votes, debates, committees.""" import sys import json import requests from datetime import datetime from pathlib import Path from typing import List, Dict, Any, Optional # Add fedmcp package to path FEDMCP_PATH = Path(__file__).parent.parent.parent.parent / "fedmcp" / "src" sys.path.insert(0, str(FEDMCP_PATH)) from fedmcp.clients.openparliament import OpenParliamentClient from fedmcp.clients.legisinfo import LegisInfoClient from fedmcp.clients.ourcommons_mps import OurCommonsMPsClient from ..utils.neo4j_client import Neo4jClient from ..utils.progress import logger, ProgressTracker, batch_iterator def detect_province(riding_name: str) -> Optional[str]: """ Detect the province/territory for a riding using keyword matching. Args: riding_name: Name of the riding (e.g., "Calgary Centre", "Toronto—Danforth") Returns: Two-letter province/territory code (e.g., "ON", "QC", "BC") or None if not detected """ # Load province mapping data data_file = Path(__file__).parent.parent / "data" / "riding_provinces.json" if not data_file.exists(): logger.warning(f"Province mapping file not found: {data_file}") return None with open(data_file, 'r') as f: mapping = json.load(f) # Check manual overrides first (for ridings like "Nunavut", "Northwest Territories") manual_overrides = mapping.get("manual_overrides", {}) for override_name, province_code in manual_overrides.items(): if override_name.lower() in riding_name.lower(): return province_code # Check keyword patterns keyword_patterns = mapping.get("keyword_patterns", {}) for province_code, keywords in keyword_patterns.items(): for keyword in keywords: if keyword.lower() in riding_name.lower(): return province_code # No match found logger.warning(f"Could not detect province for riding: {riding_name}") return None def ingest_mps(neo4j_client: Neo4jClient, batch_size: int = 10000) -> int: """ Ingest all MPs from OpenParliament API with full details. Augments OpenParliament data with OurCommons XML for missing fields: - honorific ("Hon.", "Right Hon.") - term_start_date (precise swearing-in date) - term_end_date (or None if current) - province (direct from XML, not inferred) Args: neo4j_client: Neo4j client instance batch_size: Batch size for Neo4j operations Returns: Number of MPs created """ logger.info("Fetching MPs from OpenParliament API...") op_client = OpenParliamentClient() # Fetch OurCommons MP XML for honorifics, term dates, and province logger.info("Fetching MP metadata from OurCommons XML...") ourcommons_client = OurCommonsMPsClient() ourcommons_mps = ourcommons_client.get_all_mps() # Create lookup dict indexed by person_id (matches parl_mp_id in OpenParliament) ourcommons_lookup = {mp.person_id: mp for mp in ourcommons_mps} logger.info(f"Loaded {len(ourcommons_lookup)} MPs from OurCommons XML") # Load cabinet positions cabinet_file = Path(__file__).parent.parent / "data" / "cabinet_positions.json" cabinet_positions = {} if cabinet_file.exists(): with open(cabinet_file, 'r') as f: cabinet_data = json.load(f) cabinet_positions = { m["slug"]: m["position"] for m in cabinet_data.get("cabinet_ministers", []) } logger.info(f"Loaded {len(cabinet_positions)} cabinet positions") # Fetch all MPs (current + historical) - list endpoint for URLs mps_list = list(op_client.list_mps()) logger.info(f"Found {len(mps_list):,} MPs, fetching detailed information...") # Fetch detailed data for each MP mps_data = [] for i, mp_summary in enumerate(mps_list): if i % 50 == 0: logger.info(f"Progress: {i}/{len(mps_list)}") mp_url = mp_summary.get("url", "") mp_id = mp_url.split("/")[-2] try: # Fetch full MP details mp_data = op_client._request(mp_url) # Extract nested party and riding data from current membership current_party = mp_data.get("current_party") or {} party_name = current_party.get("short_name", {}).get("en") if isinstance(current_party.get("short_name"), dict) else current_party.get("short_name") current_riding = mp_data.get("current_riding") or {} riding_name = current_riding.get("name", {}).get("en") if isinstance(current_riding.get("name"), dict) else current_riding.get("name") # Extract additional info from other_info other_info = mp_data.get("other_info", {}) twitter_handles = other_info.get("twitter", []) twitter_handle = twitter_handles[0] if twitter_handles else None wikipedia_ids = other_info.get("wikipedia_id", []) wikipedia_id = wikipedia_ids[0] if wikipedia_ids else None # Extract parl_mp_id for matching to government roles (Cabinet, PS) parl_mp_ids = other_info.get("parl_mp_id", []) parl_mp_id = int(parl_mp_ids[0]) if parl_mp_ids else None constituency_offices = other_info.get("constituency_offices", []) constituency_office = constituency_offices[0] if constituency_offices else None # Extract OurCommons link links = mp_data.get("links", []) ourcommons_url = next((link["url"] for link in links if "ourcommons.ca" in link.get("url", "")), None) # Extract photo URL from OpenParliament # The 'image' field contains a relative path like "polpics/name.jpg" # We'll store it as photo_url_source for the 3-tier fallback system photo_url_source = mp_data.get("image") if photo_url_source and not photo_url_source.startswith("http"): # Convert relative path to full OpenParliament URL photo_url_source = f"https://www.openparliament.ca{photo_url_source}" if photo_url_source.startswith("/") else f"https://www.openparliament.ca/{photo_url_source}" mp_props = { "id": mp_id, "name": mp_data.get("name"), "given_name": mp_data.get("given_name"), "family_name": mp_data.get("family_name"), "gender": mp_data.get("gender"), "party": party_name, "riding": riding_name, "current": mp_summary.get("current", True), "elected_date": mp_summary.get("elected"), "email": mp_data.get("email"), "phone": mp_data.get("voice"), "twitter": twitter_handle, "wikipedia_id": wikipedia_id, "parl_mp_id": parl_mp_id, "constituency_office": constituency_office, "ourcommons_url": ourcommons_url, "cabinet_position": cabinet_positions.get(mp_id), "photo_url_source": photo_url_source, # OpenParliament photo URL (auto-updated) "updated_at": datetime.utcnow().isoformat(), } # Merge in OurCommons XML data if available (honorific, term dates, province) if parl_mp_id and parl_mp_id in ourcommons_lookup: ourcommons_mp = ourcommons_lookup[parl_mp_id] mp_props["honorific"] = ourcommons_mp.honorific mp_props["term_start_date"] = ourcommons_mp.term_start mp_props["term_end_date"] = ourcommons_mp.term_end mp_props["province"] = ourcommons_mp.province # Filter out None values mp_props = {k: v for k, v in mp_props.items() if v is not None} mps_data.append(mp_props) except Exception as e: logger.warning(f"Failed to fetch details for {mp_id}: {e}") # Fall back to summary data current_party = mp_summary.get("current_party") or {} party_name = current_party.get("short_name", {}).get("en") if isinstance(current_party.get("short_name"), dict) else current_party.get("short_name") current_riding = mp_summary.get("current_riding") or {} riding_name = current_riding.get("name", {}).get("en") if isinstance(current_riding.get("name"), dict) else current_riding.get("name") # Extract photo URL from summary if available photo_url_source = mp_summary.get("image") if photo_url_source and not photo_url_source.startswith("http"): photo_url_source = f"https://www.openparliament.ca{photo_url_source}" if photo_url_source.startswith("/") else f"https://www.openparliament.ca/{photo_url_source}" mp_props = { "id": mp_id, "name": mp_summary.get("name"), "party": party_name, "riding": riding_name, "current": mp_summary.get("current", True), "photo_url_source": photo_url_source, "updated_at": datetime.utcnow().isoformat(), } # Try to get OurCommons XML data even in fallback case # We need parl_mp_id from summary's other_info other_info = mp_summary.get("other_info", {}) parl_mp_ids = other_info.get("parl_mp_id", []) parl_mp_id = int(parl_mp_ids[0]) if parl_mp_ids else None if parl_mp_id: mp_props["parl_mp_id"] = parl_mp_id # Merge in OurCommons XML data if available if parl_mp_id in ourcommons_lookup: ourcommons_mp = ourcommons_lookup[parl_mp_id] mp_props["honorific"] = ourcommons_mp.honorific mp_props["term_start_date"] = ourcommons_mp.term_start mp_props["term_end_date"] = ourcommons_mp.term_end mp_props["province"] = ourcommons_mp.province mp_props = {k: v for k, v in mp_props.items() if v is not None} mps_data.append(mp_props) # Batch create/update MPs using MERGE (idempotent) created = neo4j_client.batch_merge_nodes("MP", mps_data, merge_keys=["id"], batch_size=batch_size) logger.success(f"✅ Created/updated {created:,} MPs with full details") return created def ingest_parties(neo4j_client: Neo4jClient) -> int: """ Ingest political parties. Note: OpenParliament doesn't have a parties endpoint, so we extract from MPs. """ logger.info("Extracting parties from MPs...") # Query existing MPs to get unique parties result = neo4j_client.run_query( """ MATCH (m:MP) WHERE m.party IS NOT NULL WITH DISTINCT m.party AS party_name RETURN party_name ORDER BY party_name """ ) parties = [record["party_name"] for record in result] logger.info(f"Found {len(parties)} unique parties") # Manually map party short names to codes and full names party_mapping = { "Conservative": {"code": "CPC", "name": "Conservative Party of Canada"}, "Liberal": {"code": "LPC", "name": "Liberal Party of Canada"}, "NDP": {"code": "NDP", "name": "New Democratic Party"}, "Bloc Québécois": {"code": "BQ", "name": "Bloc Québécois"}, "Green": {"code": "GPC", "name": "Green Party of Canada"}, "Independent": {"code": "IND", "name": "Independent"}, "People's Party": {"code": "PPC", "name": "People's Party of Canada"}, } parties_data = [] for party_short in parties: if party_short in party_mapping: party_props = { "code": party_mapping[party_short]["code"], "name": party_mapping[party_short]["name"], "short_name": party_short, "updated_at": datetime.utcnow().isoformat(), } parties_data.append(party_props) else: # Fallback for unmapped parties parties_data.append({ "code": party_short.upper().replace(" ", "_"), "name": party_short, "short_name": party_short, "updated_at": datetime.utcnow().isoformat(), }) # Create/update parties using MERGE (idempotent) created = neo4j_client.batch_merge_nodes("Party", parties_data, merge_keys=["code"]) logger.success(f"✅ Created/updated {created} parties") return created def ingest_ridings(neo4j_client: Neo4jClient) -> int: """Ingest electoral ridings (extracted from MPs) with province detection.""" logger.info("Extracting ridings from MPs...") result = neo4j_client.run_query( """ MATCH (m:MP) WHERE m.riding IS NOT NULL WITH DISTINCT m.riding AS riding_name RETURN riding_name ORDER BY riding_name """ ) ridings = [record["riding_name"] for record in result] logger.info(f"Found {len(ridings)} unique ridings") # Create riding nodes with province detection ridings_data = [] provinces_detected = 0 for riding in ridings: province = detect_province(riding) if province: provinces_detected += 1 ridings_data.append({ "id": riding.lower().replace(" ", "-").replace("'", ""), "name": riding, "province": province, "is_vacant": False, # All current ridings have MPs (extracted from MP data) }) logger.info(f"Detected provinces for {provinces_detected}/{len(ridings)} ridings") created = neo4j_client.batch_merge_nodes("Riding", ridings_data, merge_keys=["id"]) logger.success(f"✅ Created/updated {created} ridings with province data") return created def update_vacant_ridings(neo4j_client: Neo4jClient) -> int: """ Update vacancy status for ridings without current MPs. This should be called after MPs and ridings are ingested to identify ridings that have no current MP representation. Returns: Number of vacant ridings detected """ logger.info("Checking for vacant ridings...") # Find ridings with no current MP (no REPRESENTS relationship) result = neo4j_client.run_query( """ MATCH (r:Riding) WHERE NOT EXISTS { MATCH (m:MP)-[:REPRESENTS]->(r) WHERE m.current = true } SET r.is_vacant = true RETURN count(r) as vacant_count """ ) vacant_count = result[0]["vacant_count"] if result else 0 if vacant_count > 0: logger.warning(f"Found {vacant_count} vacant ridings") else: logger.success("✅ No vacant ridings detected") return vacant_count def ingest_bills(neo4j_client: Neo4jClient, batch_size: int = 10000, limit: Optional[int] = None, fetch_details: bool = True) -> int: """ Ingest bills from OpenParliament API. Args: neo4j_client: Neo4j client batch_size: Batch size for operations limit: Limit number of bills (for testing) fetch_details: If True, fetch individual bill details to get sponsor info (slower but more complete) """ logger.info("Fetching bills from OpenParliament API...") op_client = OpenParliamentClient() # Get list of bills bills_raw = [] for i, bill in enumerate(op_client.list_bills()): bills_raw.append(bill) if limit and i >= limit - 1: break logger.info(f"Found {len(bills_raw):,} bills") # Fetch detailed bill information if requested if fetch_details: logger.info("Fetching detailed bill information (this will take a while)...") bills_detailed = [] for i, bill in enumerate(bills_raw): if (i + 1) % 100 == 0: logger.info(f" Fetched details for {i + 1}/{len(bills_raw)} bills...") try: bill_url = bill.get("url") if bill_url: bill_detail = op_client.get_bill(bill_url) bills_detailed.append(bill_detail) else: bills_detailed.append(bill) except Exception as e: logger.warning(f"Failed to fetch details for {bill.get('number')} ({bill.get('session')}): {e}") bills_detailed.append(bill) # Use list data as fallback bills_raw = bills_detailed logger.info(f"✅ Fetched details for {len(bills_detailed)}/{len(bills_raw)} bills") # Transform to Neo4j format bills_data = [] for bill in bills_raw: bill_props = { "number": bill.get("number"), "session": bill.get("session"), "title": bill.get("name", {}).get("en") or bill.get("short_title", {}).get("en"), "status": bill.get("status", {}).get("name") if isinstance(bill.get("status"), dict) else bill.get("status"), "introduced_date": bill.get("introduced"), "sponsor_politician_url": bill.get("sponsor_politician_url"), "updated_at": datetime.utcnow().isoformat(), } # Filter None values bill_props = {k: v for k, v in bill_props.items() if v is not None} bills_data.append(bill_props) # Batch merge bills (handle duplicates with composite key: number + session) created = neo4j_client.batch_merge_nodes("Bill", bills_data, merge_keys=["number", "session"], batch_size=batch_size) logger.success(f"✅ Merged {created:,} bills") return created def link_bill_sponsors(neo4j_client: Neo4jClient) -> int: """ Create SPONSORED relationships between MPs and Bills. This must be run after both MPs and Bills have been ingested. Extracts MP slug from sponsor_politician_url and creates relationships. """ logger.info("Creating SPONSORED relationships...") query = """ MATCH (b:Bill) WHERE b.sponsor_politician_url IS NOT NULL WITH b, split(b.sponsor_politician_url, '/') AS url_parts WITH b, url_parts[size(url_parts)-2] AS mp_slug MATCH (m:MP {id: mp_slug}) MERGE (m)-[:SPONSORED]->(b) RETURN count(*) as relationships_created """ result = neo4j_client.run_query(query) count = result[0]["relationships_created"] if result else 0 logger.success(f"✅ Created {count:,} SPONSORED relationships") return count def ingest_bills_from_legisinfo_json(neo4j_client: Neo4jClient, batch_size: int = 10000) -> int: """ Ingest bills from LEGISinfo JSON bulk export (FAST - single file download). This is the preferred method for bill ingestion as it: - Downloads a single 204KB JSON file (vs thousands of API calls) - Contains 100% sponsor coverage (vs 88% from OpenParliament list) - Completes in ~1 second (vs 50+ minutes for full ingestion) - No rate limiting concerns Args: neo4j_client: Neo4j client batch_size: Batch size for operations Returns: Number of bills created/updated """ logger.info("Downloading bills from LEGISinfo JSON bulk export...") # Download JSON file url = "https://www.parl.ca/legisinfo/en/bills/json" response = requests.get(url, timeout=30) response.raise_for_status() bills_json = response.json() logger.info(f"Downloaded {len(bills_json):,} bills from LEGISinfo") # Transform to Neo4j format bills_data = [] sponsor_mapping = {} # Track sponsors for relationship creation for bill in bills_json: bill_number = bill.get("BillNumberFormatted") session = bill.get("ParlSessionCode") sponsor_name = bill.get("SponsorEn") # Extract parliament and session from code like "45-1" if session and "-" in session: parl, sess = session.split("-") parliament = int(parl) session_num = int(sess) else: parliament = None session_num = None bill_props = { "number": bill_number, "session": session, "parliament": parliament, "session_number": session_num, "title": bill.get("LongTitleEn") or bill.get("ShortTitleEn"), "title_fr": bill.get("LongTitleFr") or bill.get("ShortTitleFr"), "status": bill.get("CurrentStatusEn"), "status_fr": bill.get("CurrentStatusFr"), "sponsor_name": sponsor_name, "introduced_date": bill.get("IntroducedDateTime"), "updated_at": datetime.utcnow().isoformat(), } # Add optional date fields if present optional_dates = [ ("passed_house_first_reading", "PassedHouseFirstReadingDateTime"), ("passed_house_second_reading", "PassedHouseSecondReadingDateTime"), ("passed_house_third_reading", "PassedHouseThirdReadingDateTime"), ("passed_senate_first_reading", "PassedSenateFirstReadingDateTime"), ("passed_senate_second_reading", "PassedSenateSecondReadingDateTime"), ("passed_senate_third_reading", "PassedSenateThirdReadingDateTime"), ("royal_assent", "RoyalAssentDateTime"), ] for prop_name, json_field in optional_dates: if bill.get(json_field): bill_props[prop_name] = bill.get(json_field) # Filter None values bill_props = {k: v for k, v in bill_props.items() if v is not None} bills_data.append(bill_props) # Track sponsor for relationship creation if sponsor_name and bill_number and session: sponsor_mapping[f"{bill_number}-{session}"] = sponsor_name # Batch merge bills created = neo4j_client.batch_merge_nodes("Bill", bills_data, merge_keys=["number", "session"], batch_size=batch_size) logger.success(f"✅ Merged {created:,} bills from LEGISinfo JSON") # Create SPONSORED relationships using sponsor names logger.info("Creating SPONSORED relationships from sponsor names...") rel_count = link_bill_sponsors_by_name(neo4j_client, sponsor_mapping) logger.success(f"✅ Created {rel_count:,} SPONSORED relationships") return created def link_bill_sponsors_by_name(neo4j_client: Neo4jClient, sponsor_mapping: Dict[str, str]) -> int: """ Create SPONSORED relationships using sponsor names from LEGISinfo. LEGISinfo provides sponsor names (e.g., "Hon. Mark Carney") rather than URLs, so we need to match against MP names in the database. Args: neo4j_client: Neo4j client sponsor_mapping: Dict mapping "bill_number-session" to sponsor name Returns: Number of relationships created """ if not sponsor_mapping: return 0 # Strategy: Try exact name match, then fuzzy match on family name query = """ UNWIND $bill_sponsors AS bs MATCH (b:Bill {number: bs.bill_number, session: bs.session}) // Try exact match first OPTIONAL MATCH (m1:MP) WHERE m1.name = bs.sponsor_name OR m1.name CONTAINS bs.sponsor_name OR bs.sponsor_name CONTAINS m1.name // If no exact match, try matching on family name OPTIONAL MATCH (m2:MP) WHERE bs.sponsor_name CONTAINS m2.family_name AND m2.family_name IS NOT NULL AND size(m2.family_name) > 3 WITH b, COALESCE(m1, m2) AS mp WHERE mp IS NOT NULL MERGE (mp)-[:SPONSORED]->(b) RETURN count(*) as relationships_created """ # Prepare data bill_sponsors = [] for bill_key, sponsor_name in sponsor_mapping.items(): parts = bill_key.rsplit("-", 1) if len(parts) == 2: bill_number, session = parts # Clean sponsor name - remove titles clean_name = sponsor_name for title in ["Hon. ", "Sen. ", "Mr. ", "Mrs. ", "Ms. ", "Dr. "]: clean_name = clean_name.replace(title, "") clean_name = clean_name.strip() bill_sponsors.append({ "bill_number": bill_number, "session": session, "sponsor_name": clean_name }) result = neo4j_client.run_query(query, {"bill_sponsors": bill_sponsors}) count = result[0]["relationships_created"] if result else 0 return count def ingest_votes(neo4j_client: Neo4jClient, batch_size: int = 10000, limit: Optional[int] = None) -> int: """Ingest parliamentary votes.""" logger.info("Fetching votes from OpenParliament API...") op_client = OpenParliamentClient() votes_raw = [] for i, vote in enumerate(op_client.list_votes()): votes_raw.append(vote) if limit and i >= limit - 1: break logger.info(f"Found {len(votes_raw):,} votes") # Transform to Neo4j format votes_data = [] for vote in votes_raw: vote_props = { "id": f"{vote.get('session')}-{vote.get('number')}", "number": vote.get("number"), "session": vote.get("session"), "date": vote.get("date"), "result": vote.get("result"), "yeas": vote.get("yea_total"), # OpenParliament uses yea_total, not yeas "nays": vote.get("nay_total"), # OpenParliament uses nay_total, not nays "paired": vote.get("paired_total"), # OpenParliament uses paired_total "bill_url": vote.get("bill_url"), # Capture bill URL for relationship creation "updated_at": datetime.utcnow().isoformat(), } # Filter None values vote_props = {k: v for k, v in vote_props.items() if v is not None} votes_data.append(vote_props) # Batch create votes created = neo4j_client.batch_create_nodes("Vote", votes_data, batch_size=batch_size) logger.success(f"✅ Created {created:,} votes") return created def ingest_committees(neo4j_client: Neo4jClient) -> int: """Ingest parliamentary committees.""" logger.info("Fetching committees from OpenParliament API...") op_client = OpenParliamentClient() committees_raw = list(op_client.list_committees()) logger.info(f"Found {len(committees_raw)} committees") # Transform to Neo4j format committees_data = [] for committee in committees_raw: # Determine chamber from parent_url (null = Commons, otherwise Senate/Joint) parent_url = committee.get("parent_url") if parent_url and "senate" in parent_url.lower(): chamber = "Senate" elif parent_url: chamber = "Joint" else: chamber = "Commons" committee_props = { "code": committee.get("slug"), "name": committee.get("name", {}).get("en") or committee.get("short_name", {}).get("en"), "name_fr": committee.get("name", {}).get("fr") or committee.get("short_name", {}).get("fr"), "short_name": committee.get("short_name", {}).get("en"), "short_name_fr": committee.get("short_name", {}).get("fr"), "chamber": chamber, "updated_at": datetime.utcnow().isoformat(), } # Filter None values committee_props = {k: v for k, v in committee_props.items() if v is not None} committees_data.append(committee_props) # Create/update committees using MERGE (idempotent) created = neo4j_client.batch_merge_nodes("Committee", committees_data, merge_keys=["code"]) logger.success(f"✅ Created/updated {created} committees") return created def ingest_government_roles(neo4j_client: Neo4jClient, batch_size: int = 10000) -> int: """ Ingest Cabinet ministers and Parliamentary Secretaries from House of Commons XML. Creates Role nodes with temporal tracking (from_date, to_date, is_current) for: - Cabinet ministers (including Prime Minister) - Parliamentary Secretaries Args: neo4j_client: Neo4j client instance batch_size: Batch size for Neo4j operations Returns: Number of roles created """ logger.info("Fetching government roles from House of Commons XML...") from fedmcp.clients.roles import GovernmentRolesClient roles_client = GovernmentRolesClient() roles_data = [] # Fetch ministers logger.info("Fetching Cabinet ministers...") ministers = roles_client.get_ministers() logger.info(f"Found {len(ministers)} ministers") for minister in ministers: role_props = { "id": f"minister-{minister.person_id}-{int(minister.from_date.timestamp())}", "person_id": minister.person_id, "title": minister.title, "role_type": "Prime Minister" if minister.title == "Prime Minister" else "Cabinet Minister", "from_date": minister.from_date.isoformat(), "to_date": minister.to_date.isoformat() if minister.to_date else None, "order_of_precedence": minister.order_of_precedence, "is_current": minister.is_current, "updated_at": datetime.utcnow().isoformat(), } # Filter None values roles_data.append({k: v for k, v in role_props.items() if v is not None}) # Fetch Parliamentary Secretaries logger.info("Fetching Parliamentary Secretaries...") secretaries = roles_client.get_parliamentary_secretaries() logger.info(f"Found {len(secretaries)} Parliamentary Secretaries") for ps in secretaries: role_props = { "id": f"ps-{ps.person_id}", "person_id": ps.person_id, "title": ps.title, "role_type": "Parliamentary Secretary", "is_current": True, # PS data only includes current roles "updated_at": datetime.utcnow().isoformat(), } roles_data.append({k: v for k, v in role_props.items() if v is not None}) # Batch create/update roles using MERGE (idempotent) created = neo4j_client.batch_merge_nodes("Role", roles_data, merge_keys=["id"], batch_size=batch_size) logger.success(f"✅ Created/updated {created} government roles") return created def link_government_roles(neo4j_client: Neo4jClient) -> int: """ Create HOLDS_ROLE relationships between MPs and government Roles. Matches MPs to Roles using the numeric parl_mp_id field from OpenParliament API which corresponds to the person_id field in government roles XML. This must be run after both MPs and Roles have been ingested. Args: neo4j_client: Neo4j client instance Returns: Number of relationships created """ logger.info("Creating HOLDS_ROLE relationships...") query = """ MATCH (r:Role) WHERE r.person_id IS NOT NULL MATCH (m:MP) WHERE m.parl_mp_id = r.person_id MERGE (m)-[:HOLDS_ROLE]->(r) RETURN count(*) as relationships_created """ result = neo4j_client.run_query(query) count = result[0]["relationships_created"] if result else 0 logger.success(f"✅ Created {count:,} HOLDS_ROLE relationships") return count def ingest_committee_memberships(neo4j_client: Neo4jClient) -> Dict[str, int]: """ Scrape committee membership from House of Commons website and create SERVES_ON relationships. This scrapes the current membership from committee pages at ourcommons.ca and creates relationships between MPs and Committees with role information. Uses fuzzy name matching with normalization to improve MP linking success rate. Args: neo4j_client: Neo4j client instance Returns: Dict with counts: serves_on_created, mp_not_found """ logger.info("=" * 60) logger.info("COMMITTEE MEMBERSHIP INGESTION") logger.info("=" * 60) from fedmcp.clients.committee_membership import CommitteeMembershipClient import unicodedata # Get all committees from Neo4j query = "MATCH (c:Committee) RETURN c.code as code, c.name as name" committees = neo4j_client.run_query(query) if not committees: logger.warning("No committees found in database. Run ingest_committees first.") return {"serves_on_created": 0, "mp_not_found": 0} membership_client = CommitteeMembershipClient() stats = {"serves_on_created": 0, "mp_not_found": 0} failed_committees = [] for committee in committees: code = committee["code"] name = committee["name"] try: logger.info(f"Processing {code} ({name})...") members = membership_client.get_committee_members(code) logger.info(f" Found {len(members)} members") # Create relationships for each member for member in members: # Normalize member name normalized_name = normalize_mp_name(member.name) # Try to match MP by name with improved fuzzy matching # Strategy: Try exact match first, then normalized match match_query = """ MATCH (m:MP) WHERE // Exact match on name toLower(m.name) = toLower($member_name) // Or match Given + Family OR (m.given_name IS NOT NULL AND m.family_name IS NOT NULL AND toLower(m.given_name + ' ' + m.family_name) = toLower($member_name)) // Or match Family, Given (common format) OR (m.given_name IS NOT NULL AND m.family_name IS NOT NULL AND toLower(m.family_name + ', ' + m.given_name) = toLower($member_name)) // Or match with normalized name OR toLower(m.name) = toLower($normalized_name) OR (m.given_name IS NOT NULL AND m.family_name IS NOT NULL AND toLower(m.given_name + ' ' + m.family_name) = toLower($normalized_name)) MATCH (c:Committee {code: $committee_code}) MERGE (m)-[r:SERVES_ON]->(c) SET r.role = $role, r.updated_at = $updated_at RETURN count(*) as created """ result = neo4j_client.run_query( match_query, { "member_name": member.name, "normalized_name": normalized_name, "committee_code": code, "role": member.role, "updated_at": datetime.utcnow().isoformat(), } ) if result and result[0]["created"] > 0: stats["serves_on_created"] += result[0]["created"] logger.debug(f" ✓ Linked {member.name} as {member.role}") else: logger.warning(f" ✗ Could not find MP: {member.name}") stats["mp_not_found"] += 1 except Exception as e: logger.error(f" Error processing {code}: {e}") failed_committees.append(code) continue logger.info("=" * 60) logger.success("✅ COMMITTEE MEMBERSHIP INGESTION COMPLETE") logger.info(f"SERVES_ON relationships created: {stats['serves_on_created']}") logger.info(f"MPs not found: {stats['mp_not_found']}") if failed_committees: logger.warning(f"Failed to fetch membership for {len(failed_committees)} committees: {', '.join(failed_committees)}") logger.info("=" * 60) return stats def normalize_mp_name(name: str) -> str: """ Normalize MP name for matching. Handles: - Accents (é → e, ç → c) - Honorifics (Hon., Rt. Hon., Dr.) - Extra whitespace Pattern from finances.py. Args: name: MP name (e.g., "Hon. Dominic LeBlanc" or "Marie-Claude Bibeau") Returns: Normalized name """ import unicodedata # Remove accents name = ''.join( c for c in unicodedata.normalize('NFD', name) if unicodedata.category(c) != 'Mn' ) # Remove honorifics for honorific in ['Hon.', 'Rt. Hon.', 'Dr.', 'Mr.', 'Mrs.', 'Ms.']: name = name.replace(honorific, '') # Trim whitespace name = ' '.join(name.split()) return name def ingest_parliament_data( neo4j_client: Neo4jClient, batch_size: int = 10000, limit_bills: Optional[int] = None, limit_votes: Optional[int] = None, ) -> Dict[str, int]: """ Run full parliament data ingestion pipeline. Args: neo4j_client: Neo4j client batch_size: Batch size for operations limit_bills: Limit number of bills (for testing) limit_votes: Limit number of votes (for testing) Returns: Dict with counts of created entities """ logger.info("=" * 60) logger.info("PARLIAMENT DATA INGESTION") logger.info("=" * 60) stats = {} # 1. MPs stats["mps"] = ingest_mps(neo4j_client, batch_size) # 2. Parties (derived from MPs) stats["parties"] = ingest_parties(neo4j_client) # 3. Ridings (derived from MPs) stats["ridings"] = ingest_ridings(neo4j_client) # 4. Bills (using LEGISinfo JSON bulk export - fast!) # Note: This also creates SPONSORED relationships internally stats["bills"] = ingest_bills_from_legisinfo_json(neo4j_client, batch_size) stats["bill_sponsors"] = "included" # Handled by ingest_bills_from_legisinfo_json # 5. Votes stats["votes"] = ingest_votes(neo4j_client, batch_size, limit=limit_votes) # 6. Committees stats["committees"] = ingest_committees(neo4j_client) # 7. Government roles (Cabinet ministers and Parliamentary Secretaries) stats["roles"] = ingest_government_roles(neo4j_client, batch_size) # 8. Link government roles to MPs stats["role_relationships"] = link_government_roles(neo4j_client) logger.info("=" * 60) logger.success("✅ PARLIAMENT DATA INGESTION COMPLETE") logger.info(f"MPs: {stats['mps']:,}") logger.info(f"Parties: {stats['parties']}") logger.info(f"Ridings: {stats['ridings']}") logger.info(f"Bills: {stats['bills']:,}") logger.info(f"Bill Sponsors: {stats['bill_sponsors']:,}") logger.info(f"Votes: {stats['votes']:,}") logger.info(f"Committees: {stats['committees']}") logger.info(f"Government Roles: {stats['roles']}") logger.info(f"MP-Role Relationships: {stats['role_relationships']:,}") logger.info("=" * 60) return stats

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/northernvariables/FedMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server