Skip to main content
Glama
northernvariables

FedMCP - Federal Parliamentary Information

bill_structure.py32.5 kB
"""Bill structure ingestion from Parliament.ca XML to Neo4j. This module imports structured bill content (parts, sections, subsections, paragraphs) as well as version history and amendment events from LEGISinfo. Data Sources: - Parliament.ca Bill XML: /Content/Bills/{parliament}/{type}/{bill}/{bill}_{version}/{bill}_E.xml - LEGISinfo JSON API: /LegisInfo/en/bill/{session}/{bill}/json Neo4j Schema: Nodes: - BillVersion: Version snapshots through legislative process - BillAmendmentEvent: Amendment-related events (committee reports, etc.) - BillPart: Top-level divisions (Part I, Part II) - BillSection: Main numbered sections (1, 2, 3...) - BillSubsection: Subsections (1), (2), (3)... - BillParagraph: Paragraphs (a), (b), (c)... - BillSubparagraph: Subparagraphs (i), (ii), (iii)... - BillDefinition: Definitions within bills Relationships: - (Bill)-[:HAS_VERSION]->(BillVersion) - (Bill)-[:HAS_AMENDMENT_EVENT]->(BillAmendmentEvent) - (Bill)-[:HAS_PART]->(BillPart) - (Bill)-[:HAS_SECTION]->(BillSection) (for sections not in parts) - (BillPart)-[:HAS_SECTION]->(BillSection) - (BillSection)-[:HAS_SUBSECTION]->(BillSubsection) - (BillSubsection)-[:HAS_PARAGRAPH]->(BillParagraph) - (BillParagraph)-[:HAS_SUBPARAGRAPH]->(BillSubparagraph) - (Bill)-[:HAS_DEFINITION]->(BillDefinition) Example: >>> from fedmcp_pipeline.utils.neo4j_client import Neo4jClient >>> neo4j = Neo4jClient(uri="bolt://localhost:7687", user="neo4j", password="password") >>> >>> # Ingest a single bill >>> result = ingest_bill_structure(neo4j, parliament=44, session=1, bill_number="C-2") >>> >>> # Ingest multiple bills >>> results = ingest_bills_from_list(neo4j, bills=[("44-1", "C-2"), ("44-1", "C-3")]) """ import sys from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple # Add fedmcp package to path FEDMCP_PATH = Path(__file__).parent.parent.parent.parent / "fedmcp" / "src" sys.path.insert(0, str(FEDMCP_PATH)) from fedmcp.clients.bill_text_xml import ( BillTextXMLClient, ParsedBill, BillVersion, BillAmendmentEvent, BillPart, BillSection, BillSubsection, BillParagraph, BillSubparagraph, BillDefinition, to_dict, ) from ..utils.neo4j_client import Neo4jClient from ..utils.progress import logger, ProgressTracker def create_bill_structure_schema(neo4j_client: Neo4jClient) -> None: """Create Neo4j schema for bill structure nodes. Creates: - Unique constraints on all structure node IDs - Indexes on anchor_id for fast lookups - Indexes on bill_id for relationship creation """ logger.info("Creating bill structure schema...") # BillVersion neo4j_client.run_query(""" CREATE CONSTRAINT bill_version_id IF NOT EXISTS FOR (bv:BillVersion) REQUIRE bv.id IS UNIQUE """) neo4j_client.run_query(""" CREATE INDEX bill_version_bill_id IF NOT EXISTS FOR (bv:BillVersion) ON (bv.bill_id) """) # BillAmendmentEvent neo4j_client.run_query(""" CREATE CONSTRAINT bill_amendment_event_id IF NOT EXISTS FOR (bae:BillAmendmentEvent) REQUIRE bae.id IS UNIQUE """) neo4j_client.run_query(""" CREATE INDEX bill_amendment_event_bill_id IF NOT EXISTS FOR (bae:BillAmendmentEvent) ON (bae.bill_id) """) # BillPart neo4j_client.run_query(""" CREATE CONSTRAINT bill_part_id IF NOT EXISTS FOR (bp:BillPart) REQUIRE bp.id IS UNIQUE """) neo4j_client.run_query(""" CREATE INDEX bill_part_anchor_id IF NOT EXISTS FOR (bp:BillPart) ON (bp.anchor_id) """) neo4j_client.run_query(""" CREATE INDEX bill_part_bill_id IF NOT EXISTS FOR (bp:BillPart) ON (bp.bill_id) """) # BillSection neo4j_client.run_query(""" CREATE CONSTRAINT bill_section_id IF NOT EXISTS FOR (bs:BillSection) REQUIRE bs.id IS UNIQUE """) neo4j_client.run_query(""" CREATE INDEX bill_section_anchor_id IF NOT EXISTS FOR (bs:BillSection) ON (bs.anchor_id) """) neo4j_client.run_query(""" CREATE INDEX bill_section_bill_id IF NOT EXISTS FOR (bs:BillSection) ON (bs.bill_id) """) # BillSubsection neo4j_client.run_query(""" CREATE CONSTRAINT bill_subsection_id IF NOT EXISTS FOR (bss:BillSubsection) REQUIRE bss.id IS UNIQUE """) neo4j_client.run_query(""" CREATE INDEX bill_subsection_anchor_id IF NOT EXISTS FOR (bss:BillSubsection) ON (bss.anchor_id) """) neo4j_client.run_query(""" CREATE INDEX bill_subsection_section_id IF NOT EXISTS FOR (bss:BillSubsection) ON (bss.section_id) """) # BillParagraph neo4j_client.run_query(""" CREATE CONSTRAINT bill_paragraph_id IF NOT EXISTS FOR (bpg:BillParagraph) REQUIRE bpg.id IS UNIQUE """) neo4j_client.run_query(""" CREATE INDEX bill_paragraph_anchor_id IF NOT EXISTS FOR (bpg:BillParagraph) ON (bpg.anchor_id) """) neo4j_client.run_query(""" CREATE INDEX bill_paragraph_subsection_id IF NOT EXISTS FOR (bpg:BillParagraph) ON (bpg.subsection_id) """) # BillSubparagraph neo4j_client.run_query(""" CREATE CONSTRAINT bill_subparagraph_id IF NOT EXISTS FOR (bsp:BillSubparagraph) REQUIRE bsp.id IS UNIQUE """) neo4j_client.run_query(""" CREATE INDEX bill_subparagraph_anchor_id IF NOT EXISTS FOR (bsp:BillSubparagraph) ON (bsp.anchor_id) """) neo4j_client.run_query(""" CREATE INDEX bill_subparagraph_paragraph_id IF NOT EXISTS FOR (bsp:BillSubparagraph) ON (bsp.paragraph_id) """) # BillDefinition neo4j_client.run_query(""" CREATE CONSTRAINT bill_definition_id IF NOT EXISTS FOR (bd:BillDefinition) REQUIRE bd.id IS UNIQUE """) neo4j_client.run_query(""" CREATE INDEX bill_definition_bill_id IF NOT EXISTS FOR (bd:BillDefinition) ON (bd.bill_id) """) logger.info("✅ Bill structure schema created") def ingest_bill_versions( neo4j_client: Neo4jClient, bill: ParsedBill, ) -> int: """Ingest bill version nodes and relationships. Args: neo4j_client: Neo4j client instance bill: Parsed bill with available_versions Returns: Number of versions created """ if not bill.available_versions: return 0 bill_id = f"{bill.session_str}:{bill.bill_number}" now = datetime.utcnow().isoformat() versions_data = [] for ver in bill.available_versions: version_id = f"{bill_id}:v{ver.version_number}" versions_data.append({ "id": version_id, "bill_id": bill_id, "version_number": ver.version_number, "stage": ver.stage.value if ver.stage else "first-reading", "publication_type_name": ver.publication_type_name, "publication_date": ver.publication_date.isoformat() if ver.publication_date else None, "has_amendments": ver.has_amendments, "xml_url": ver.xml_url, "pdf_url": ver.pdf_url, "updated_at": now, }) # Create BillVersion nodes cypher = """ UNWIND $versions AS v MERGE (bv:BillVersion {id: v.id}) SET bv.bill_id = v.bill_id, bv.version_number = v.version_number, bv.stage = v.stage, bv.publication_type_name = v.publication_type_name, bv.publication_date = CASE WHEN v.publication_date IS NOT NULL THEN datetime(v.publication_date) ELSE NULL END, bv.has_amendments = v.has_amendments, bv.xml_url = v.xml_url, bv.pdf_url = v.pdf_url, bv.updated_at = datetime(v.updated_at) WITH bv, v, split(v.bill_id, ':') AS parts MATCH (b:Bill {session: parts[0], number: parts[1]}) MERGE (b)-[:HAS_VERSION]->(bv) RETURN count(bv) as created """ result = neo4j_client.run_query(cypher, {"versions": versions_data}) created = result[0]["created"] if result else 0 logger.info(f" ✅ Created {created} BillVersion nodes") return created def ingest_amendment_events( neo4j_client: Neo4jClient, bill: ParsedBill, ) -> int: """Ingest bill amendment event nodes and relationships. Args: neo4j_client: Neo4j client instance bill: Parsed bill with amendment_events Returns: Number of amendment events created """ if not bill.amendment_events: return 0 bill_id = f"{bill.session_str}:{bill.bill_number}" now = datetime.utcnow().isoformat() events_data = [] for i, event in enumerate(bill.amendment_events, start=1): event_id = f"{bill_id}:amend:{i}" events_data.append({ "id": event_id, "bill_id": bill_id, "event_type": event.event_type, "description_en": event.description_en, "description_fr": event.description_fr, "event_date": event.event_date.isoformat() if event.event_date else None, "chamber": event.chamber, "stage": event.stage, "committee_code": event.committee_code, "committee_name": event.committee_name, "report_id": event.report_id, "report_number": event.report_number, "number_of_amendments": event.number_of_amendments, "updated_at": now, }) # Create BillAmendmentEvent nodes cypher = """ UNWIND $events AS e MERGE (bae:BillAmendmentEvent {id: e.id}) SET bae.bill_id = e.bill_id, bae.event_type = e.event_type, bae.description_en = e.description_en, bae.description_fr = e.description_fr, bae.event_date = CASE WHEN e.event_date IS NOT NULL THEN datetime(e.event_date) ELSE NULL END, bae.chamber = e.chamber, bae.stage = e.stage, bae.committee_code = e.committee_code, bae.committee_name = e.committee_name, bae.report_id = e.report_id, bae.report_number = e.report_number, bae.number_of_amendments = e.number_of_amendments, bae.updated_at = datetime(e.updated_at) WITH bae, e, split(e.bill_id, ':') AS parts MATCH (b:Bill {session: parts[0], number: parts[1]}) MERGE (b)-[:HAS_AMENDMENT_EVENT]->(bae) RETURN count(bae) as created """ result = neo4j_client.run_query(cypher, {"events": events_data}) created = result[0]["created"] if result else 0 logger.info(f" ✅ Created {created} BillAmendmentEvent nodes") return created def ingest_bill_parts( neo4j_client: Neo4jClient, bill: ParsedBill, ) -> int: """Ingest bill part nodes and relationships. Args: neo4j_client: Neo4j client instance bill: Parsed bill with parts Returns: Number of parts created """ if not bill.parts: return 0 bill_id = f"{bill.session_str}:{bill.bill_number}" now = datetime.utcnow().isoformat() parts_data = [] for part in bill.parts: parts_data.append({ "id": part.id, "bill_id": bill_id, "number": part.number, "title_en": part.title_en, "title_fr": part.title_fr, "anchor_id": part.anchor_id, "sequence": part.sequence, "updated_at": now, }) # Create BillPart nodes cypher = """ UNWIND $parts AS p MERGE (bp:BillPart {id: p.id}) SET bp.bill_id = p.bill_id, bp.number = p.number, bp.title_en = p.title_en, bp.title_fr = p.title_fr, bp.anchor_id = p.anchor_id, bp.sequence = p.sequence, bp.updated_at = datetime(p.updated_at) WITH bp, p, split(p.bill_id, ':') AS parts MATCH (b:Bill {session: parts[0], number: parts[1]}) MERGE (b)-[:HAS_PART]->(bp) RETURN count(bp) as created """ result = neo4j_client.run_query(cypher, {"parts": parts_data}) created = result[0]["created"] if result else 0 logger.info(f" ✅ Created {created} BillPart nodes") return created def ingest_bill_sections( neo4j_client: Neo4jClient, bill: ParsedBill, ) -> int: """Ingest bill section nodes and relationships. Handles both: - Sections within parts: (BillPart)-[:HAS_SECTION]->(BillSection) - Loose sections (no part): (Bill)-[:HAS_SECTION]->(BillSection) Args: neo4j_client: Neo4j client instance bill: Parsed bill with parts and sections Returns: Number of sections created """ bill_id = f"{bill.session_str}:{bill.bill_number}" now = datetime.utcnow().isoformat() sections_data = [] # Sections within parts for part in bill.parts: for section in part.sections: sections_data.append({ "id": section.id, "bill_id": bill_id, "part_id": part.id, "number": section.number, "marginal_note_en": section.marginal_note_en, "marginal_note_fr": section.marginal_note_fr, "text_en": section.text_en, "text_fr": section.text_fr, "anchor_id": section.anchor_id, "sequence": section.sequence, "updated_at": now, }) # Loose sections (not in any part) for section in bill.sections: sections_data.append({ "id": section.id, "bill_id": bill_id, "part_id": None, "number": section.number, "marginal_note_en": section.marginal_note_en, "marginal_note_fr": section.marginal_note_fr, "text_en": section.text_en, "text_fr": section.text_fr, "anchor_id": section.anchor_id, "sequence": section.sequence, "updated_at": now, }) if not sections_data: return 0 # Create BillSection nodes cypher = """ UNWIND $sections AS s MERGE (bs:BillSection {id: s.id}) SET bs.bill_id = s.bill_id, bs.part_id = s.part_id, bs.number = s.number, bs.marginal_note_en = s.marginal_note_en, bs.marginal_note_fr = s.marginal_note_fr, bs.text_en = s.text_en, bs.text_fr = s.text_fr, bs.anchor_id = s.anchor_id, bs.sequence = s.sequence, bs.updated_at = datetime(s.updated_at) RETURN count(bs) as created """ result = neo4j_client.run_query(cypher, {"sections": sections_data}) created = result[0]["created"] if result else 0 # Create relationships for sections in parts cypher_part_rel = """ UNWIND $sections AS s MATCH (bs:BillSection {id: s.id}) MATCH (bp:BillPart {id: s.part_id}) WHERE s.part_id IS NOT NULL MERGE (bp)-[:HAS_SECTION]->(bs) """ neo4j_client.run_query(cypher_part_rel, {"sections": sections_data}) # Create relationships for loose sections (Bill -> Section) # Note: Bill nodes don't have an 'id' property - they're matched by session + number # bill_id format is "45-1:C-2", so we split on ':' to get session and number cypher_bill_rel = """ UNWIND $sections AS s MATCH (bs:BillSection {id: s.id}) WITH bs, s, split(s.bill_id, ':') AS parts MATCH (b:Bill {session: parts[0], number: parts[1]}) WHERE s.part_id IS NULL MERGE (b)-[:HAS_SECTION]->(bs) """ neo4j_client.run_query(cypher_bill_rel, {"sections": sections_data}) logger.info(f" ✅ Created {created} BillSection nodes") return created def ingest_bill_subsections( neo4j_client: Neo4jClient, bill: ParsedBill, ) -> int: """Ingest bill subsection nodes and relationships. Args: neo4j_client: Neo4j client instance bill: Parsed bill with parts and sections Returns: Number of subsections created """ now = datetime.utcnow().isoformat() subsections_data = [] # Collect all subsections from all sections all_sections = list(bill.sections) # Loose sections for part in bill.parts: all_sections.extend(part.sections) for section in all_sections: for subsection in section.subsections: subsections_data.append({ "id": subsection.id, "section_id": section.id, "number": subsection.number, "text_en": subsection.text_en, "text_fr": subsection.text_fr, "anchor_id": subsection.anchor_id, "sequence": subsection.sequence, "updated_at": now, }) if not subsections_data: return 0 # Create BillSubsection nodes cypher = """ UNWIND $subsections AS ss MERGE (bss:BillSubsection {id: ss.id}) SET bss.section_id = ss.section_id, bss.number = ss.number, bss.text_en = ss.text_en, bss.text_fr = ss.text_fr, bss.anchor_id = ss.anchor_id, bss.sequence = ss.sequence, bss.updated_at = datetime(ss.updated_at) WITH bss, ss MATCH (bs:BillSection {id: ss.section_id}) MERGE (bs)-[:HAS_SUBSECTION]->(bss) RETURN count(bss) as created """ result = neo4j_client.run_query(cypher, {"subsections": subsections_data}) created = result[0]["created"] if result else 0 logger.info(f" ✅ Created {created} BillSubsection nodes") return created def ingest_bill_paragraphs( neo4j_client: Neo4jClient, bill: ParsedBill, ) -> int: """Ingest bill paragraph nodes and relationships. Args: neo4j_client: Neo4j client instance bill: Parsed bill with parts and sections Returns: Number of paragraphs created """ now = datetime.utcnow().isoformat() paragraphs_data = [] # Collect all paragraphs from all subsections all_sections = list(bill.sections) for part in bill.parts: all_sections.extend(part.sections) for section in all_sections: for subsection in section.subsections: for paragraph in subsection.paragraphs: paragraphs_data.append({ "id": paragraph.id, "subsection_id": subsection.id, "letter": paragraph.letter, "text_en": paragraph.text_en, "text_fr": paragraph.text_fr, "anchor_id": paragraph.anchor_id, "sequence": paragraph.sequence, "updated_at": now, }) if not paragraphs_data: return 0 # Create BillParagraph nodes cypher = """ UNWIND $paragraphs AS p MERGE (bpg:BillParagraph {id: p.id}) SET bpg.subsection_id = p.subsection_id, bpg.letter = p.letter, bpg.text_en = p.text_en, bpg.text_fr = p.text_fr, bpg.anchor_id = p.anchor_id, bpg.sequence = p.sequence, bpg.updated_at = datetime(p.updated_at) WITH bpg, p MATCH (bss:BillSubsection {id: p.subsection_id}) MERGE (bss)-[:HAS_PARAGRAPH]->(bpg) RETURN count(bpg) as created """ result = neo4j_client.run_query(cypher, {"paragraphs": paragraphs_data}) created = result[0]["created"] if result else 0 logger.info(f" ✅ Created {created} BillParagraph nodes") return created def ingest_bill_subparagraphs( neo4j_client: Neo4jClient, bill: ParsedBill, ) -> int: """Ingest bill subparagraph nodes and relationships. Args: neo4j_client: Neo4j client instance bill: Parsed bill with parts and sections Returns: Number of subparagraphs created """ now = datetime.utcnow().isoformat() subparagraphs_data = [] # Collect all subparagraphs from all paragraphs all_sections = list(bill.sections) for part in bill.parts: all_sections.extend(part.sections) for section in all_sections: for subsection in section.subsections: for paragraph in subsection.paragraphs: for subparagraph in paragraph.subparagraphs: subparagraphs_data.append({ "id": subparagraph.id, "paragraph_id": paragraph.id, "numeral": subparagraph.numeral, "text_en": subparagraph.text_en, "text_fr": subparagraph.text_fr, "anchor_id": subparagraph.anchor_id, "sequence": subparagraph.sequence, "updated_at": now, }) if not subparagraphs_data: return 0 # Create BillSubparagraph nodes cypher = """ UNWIND $subparagraphs AS sp MERGE (bsp:BillSubparagraph {id: sp.id}) SET bsp.paragraph_id = sp.paragraph_id, bsp.numeral = sp.numeral, bsp.text_en = sp.text_en, bsp.text_fr = sp.text_fr, bsp.anchor_id = sp.anchor_id, bsp.sequence = sp.sequence, bsp.updated_at = datetime(sp.updated_at) WITH bsp, sp MATCH (bpg:BillParagraph {id: sp.paragraph_id}) MERGE (bpg)-[:HAS_SUBPARAGRAPH]->(bsp) RETURN count(bsp) as created """ result = neo4j_client.run_query(cypher, {"subparagraphs": subparagraphs_data}) created = result[0]["created"] if result else 0 logger.info(f" ✅ Created {created} BillSubparagraph nodes") return created def ingest_bill_definitions( neo4j_client: Neo4jClient, bill: ParsedBill, ) -> int: """Ingest bill definition nodes and relationships. Args: neo4j_client: Neo4j client instance bill: Parsed bill with definitions Returns: Number of definitions created """ if not bill.definitions: return 0 bill_id = f"{bill.session_str}:{bill.bill_number}" now = datetime.utcnow().isoformat() definitions_data = [] for i, defn in enumerate(bill.definitions, start=1): def_id = f"{bill_id}:def:{i}" definitions_data.append({ "id": def_id, "bill_id": bill_id, "term_en": defn.term_en, "term_fr": defn.term_fr, "definition_en": defn.definition_en, "definition_fr": defn.definition_fr, "sequence": i, "updated_at": now, }) # Create BillDefinition nodes cypher = """ UNWIND $definitions AS d MERGE (bd:BillDefinition {id: d.id}) SET bd.bill_id = d.bill_id, bd.term_en = d.term_en, bd.term_fr = d.term_fr, bd.definition_en = d.definition_en, bd.definition_fr = d.definition_fr, bd.sequence = d.sequence, bd.updated_at = datetime(d.updated_at) WITH bd, d, split(d.bill_id, ':') AS parts MATCH (b:Bill {session: parts[0], number: parts[1]}) MERGE (b)-[:HAS_DEFINITION]->(bd) RETURN count(bd) as created """ result = neo4j_client.run_query(cypher, {"definitions": definitions_data}) created = result[0]["created"] if result else 0 logger.info(f" ✅ Created {created} BillDefinition nodes") return created def ingest_bill_structure( neo4j_client: Neo4jClient, parliament: int, session: int, bill_number: str, *, version: int = 1, is_government: bool = False, include_all_versions: bool = True, ) -> Dict[str, int]: """Ingest complete bill structure from Parliament.ca XML. This is the main entry point for ingesting a single bill's structure. Args: neo4j_client: Neo4j client instance parliament: Parliament number (e.g., 44) session: Session number (e.g., 1) bill_number: Bill code (e.g., "C-2") version: Version number to parse (1=first reading, etc.) is_government: True for government bills include_all_versions: If True, fetch and store all available versions Returns: Dictionary with counts for each node type created """ session_str = f"{parliament}-{session}" bill_id = f"{session_str}:{bill_number.upper()}" logger.info(f"Ingesting bill structure for {bill_id}...") # First check if bill exists in Neo4j (match by number and session, not id) check_result = neo4j_client.run_query( "MATCH (b:Bill {number: $number, session: $session}) RETURN b.number", {"number": bill_number.upper(), "session": session_str} ) if not check_result: logger.warning(f" Bill {bill_id} not found in Neo4j, skipping structure ingestion") return {"error": f"Bill {bill_id} not found"} # Fetch and parse bill with history client = BillTextXMLClient() try: bill = client.parse_bill_with_history( parliament=parliament, session=session, bill_number=bill_number, version=version, is_government=is_government, include_all_versions=include_all_versions, ) except Exception as e: logger.error(f" Failed to fetch/parse bill {bill_id}: {e}") return {"error": str(e)} results = { "bill": bill_id, "versions": 0, "amendment_events": 0, "parts": 0, "sections": 0, "subsections": 0, "paragraphs": 0, "subparagraphs": 0, "definitions": 0, } # Ingest all structure components results["versions"] = ingest_bill_versions(neo4j_client, bill) results["amendment_events"] = ingest_amendment_events(neo4j_client, bill) results["parts"] = ingest_bill_parts(neo4j_client, bill) results["sections"] = ingest_bill_sections(neo4j_client, bill) results["subsections"] = ingest_bill_subsections(neo4j_client, bill) results["paragraphs"] = ingest_bill_paragraphs(neo4j_client, bill) results["subparagraphs"] = ingest_bill_subparagraphs(neo4j_client, bill) results["definitions"] = ingest_bill_definitions(neo4j_client, bill) total = sum(v for k, v in results.items() if k != "bill" and k != "error") logger.info(f"✅ Ingested {total} total nodes for {bill_id}") return results def ingest_bills_from_list( neo4j_client: Neo4jClient, bills: List[Tuple[str, str]], *, default_version: int = 1, ) -> Dict[str, Any]: """Ingest multiple bills from a list of (session, bill_number) tuples. Args: neo4j_client: Neo4j client instance bills: List of (session, bill_number) tuples, e.g., [("44-1", "C-2"), ("45-1", "C-234")] default_version: Default version to parse if not specified Returns: Dictionary with overall statistics and per-bill results """ logger.info(f"Ingesting structure for {len(bills)} bills...") progress = ProgressTracker( total=len(bills), desc="Ingesting bill structures", unit="bills" ) results = { "total_bills": len(bills), "successful": 0, "failed": 0, "totals": { "versions": 0, "amendment_events": 0, "parts": 0, "sections": 0, "subsections": 0, "paragraphs": 0, "subparagraphs": 0, "definitions": 0, }, "per_bill": [], } for session_str, bill_number in bills: # Parse session string (e.g., "44-1" -> parliament=44, session=1) parts = session_str.split("-") if len(parts) != 2: logger.warning(f"Invalid session format: {session_str}, skipping {bill_number}") results["failed"] += 1 progress.update(1) continue parliament = int(parts[0]) session = int(parts[1]) # Check if government bill from database bill_info = neo4j_client.run_query( "MATCH (b:Bill {number: $number, session: $session}) RETURN b.is_government_bill as is_gov", {"number": bill_number.upper(), "session": session_str} ) is_government = bill_info[0]["is_gov"] if bill_info and bill_info[0].get("is_gov") is not None else False bill_result = ingest_bill_structure( neo4j_client, parliament=parliament, session=session, bill_number=bill_number, version=default_version, is_government=is_government, ) if "error" in bill_result: results["failed"] += 1 else: results["successful"] += 1 for key in results["totals"]: results["totals"][key] += bill_result.get(key, 0) results["per_bill"].append(bill_result) progress.update(1) progress.close() logger.info("=" * 60) logger.info(f"✅ Bill structure ingestion complete:") logger.info(f" Successful: {results['successful']}/{results['total_bills']}") logger.info(f" Failed: {results['failed']}/{results['total_bills']}") logger.info(f" Total nodes created: {sum(results['totals'].values())}") logger.info("=" * 60) return results def ingest_all_bills_in_session( neo4j_client: Neo4jClient, parliament: int, session: int, *, limit: Optional[int] = None, ) -> Dict[str, Any]: """Ingest structure for all bills in a parliamentary session. Fetches all bills from the session in Neo4j and ingests their structure. Args: neo4j_client: Neo4j client instance parliament: Parliament number (e.g., 44) session: Session number (e.g., 1) limit: Optional limit on number of bills to process Returns: Dictionary with overall statistics """ session_str = f"{parliament}-{session}" logger.info(f"Fetching all bills from session {session_str}...") # Get all bills in this session from Neo4j query = """ MATCH (b:Bill) WHERE b.parliament_session = $session_str RETURN b.id AS id, b.number AS number ORDER BY b.number """ if limit: query += f" LIMIT {limit}" results = neo4j_client.run_query(query, {"session_str": session_str}) if not results: logger.warning(f"No bills found in session {session_str}") return {"error": f"No bills found in session {session_str}"} bills = [(session_str, r["number"]) for r in results if r.get("number")] logger.info(f"Found {len(bills)} bills in session {session_str}") return ingest_bills_from_list(neo4j_client, bills) def run_bill_structure_ingestion( neo4j_client: Neo4jClient, *, session_str: Optional[str] = None, bill_numbers: Optional[List[str]] = None, limit: Optional[int] = None, ) -> Dict[str, Any]: """Main entry point for bill structure ingestion. Can be called with: - session_str only: Ingest all bills in that session - session_str + bill_numbers: Ingest specific bills in that session - bill_numbers only with session in format "44-1:C-2": Ingest specific bills Args: neo4j_client: Neo4j client instance session_str: Parliamentary session (e.g., "44-1") bill_numbers: List of bill numbers (e.g., ["C-2", "C-3"]) limit: Optional limit on number of bills Returns: Dictionary with results """ logger.info("=" * 60) logger.info("BILL STRUCTURE INGESTION") logger.info("=" * 60) # Create schema create_bill_structure_schema(neo4j_client) if session_str and bill_numbers: # Specific bills in a session bills = [(session_str, bn) for bn in bill_numbers] return ingest_bills_from_list(neo4j_client, bills) elif session_str: # All bills in a session parts = session_str.split("-") if len(parts) != 2: return {"error": f"Invalid session format: {session_str}"} return ingest_all_bills_in_session( neo4j_client, parliament=int(parts[0]), session=int(parts[1]), limit=limit, ) elif bill_numbers: # Bill numbers with embedded session (e.g., "44-1:C-2") bills = [] for bn in bill_numbers: if ":" in bn: sess, num = bn.split(":", 1) bills.append((sess, num)) else: logger.warning(f"Bill {bn} missing session, skipping") return ingest_bills_from_list(neo4j_client, bills) else: return {"error": "Must specify session_str and/or bill_numbers"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/northernvariables/FedMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server