We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/northernvariables/FedMCP'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Bill structure ingestion from Parliament.ca XML to Neo4j.
This module imports structured bill content (parts, sections, subsections, paragraphs)
as well as version history and amendment events from LEGISinfo.
Data Sources:
- Parliament.ca Bill XML: /Content/Bills/{parliament}/{type}/{bill}/{bill}_{version}/{bill}_E.xml
- LEGISinfo JSON API: /LegisInfo/en/bill/{session}/{bill}/json
Neo4j Schema:
Nodes:
- BillVersion: Version snapshots through legislative process
- BillAmendmentEvent: Amendment-related events (committee reports, etc.)
- BillPart: Top-level divisions (Part I, Part II)
- BillSection: Main numbered sections (1, 2, 3...)
- BillSubsection: Subsections (1), (2), (3)...
- BillParagraph: Paragraphs (a), (b), (c)...
- BillSubparagraph: Subparagraphs (i), (ii), (iii)...
- BillDefinition: Definitions within bills
Relationships:
- (Bill)-[:HAS_VERSION]->(BillVersion)
- (Bill)-[:HAS_AMENDMENT_EVENT]->(BillAmendmentEvent)
- (Bill)-[:HAS_PART]->(BillPart)
- (Bill)-[:HAS_SECTION]->(BillSection) (for sections not in parts)
- (BillPart)-[:HAS_SECTION]->(BillSection)
- (BillSection)-[:HAS_SUBSECTION]->(BillSubsection)
- (BillSubsection)-[:HAS_PARAGRAPH]->(BillParagraph)
- (BillParagraph)-[:HAS_SUBPARAGRAPH]->(BillSubparagraph)
- (Bill)-[:HAS_DEFINITION]->(BillDefinition)
Example:
>>> from fedmcp_pipeline.utils.neo4j_client import Neo4jClient
>>> neo4j = Neo4jClient(uri="bolt://localhost:7687", user="neo4j", password="password")
>>>
>>> # Ingest a single bill
>>> result = ingest_bill_structure(neo4j, parliament=44, session=1, bill_number="C-2")
>>>
>>> # Ingest multiple bills
>>> results = ingest_bills_from_list(neo4j, bills=[("44-1", "C-2"), ("44-1", "C-3")])
"""
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# Add fedmcp package to path
FEDMCP_PATH = Path(__file__).parent.parent.parent.parent / "fedmcp" / "src"
sys.path.insert(0, str(FEDMCP_PATH))
from fedmcp.clients.bill_text_xml import (
BillTextXMLClient,
ParsedBill,
BillVersion,
BillAmendmentEvent,
BillPart,
BillSection,
BillSubsection,
BillParagraph,
BillSubparagraph,
BillDefinition,
to_dict,
)
from ..utils.neo4j_client import Neo4jClient
from ..utils.progress import logger, ProgressTracker
from .bill_full_text_extractor import extract_continuous_text, validate_extracted_text
def create_bill_structure_schema(neo4j_client: Neo4jClient) -> None:
"""Create Neo4j schema for bill structure nodes.
Creates:
- Unique constraints on all structure node IDs
- Indexes on anchor_id for fast lookups
- Indexes on bill_id for relationship creation
"""
logger.info("Creating bill structure schema...")
# BillVersion
neo4j_client.run_query("""
CREATE CONSTRAINT bill_version_id IF NOT EXISTS
FOR (bv:BillVersion) REQUIRE bv.id IS UNIQUE
""")
neo4j_client.run_query("""
CREATE INDEX bill_version_bill_id IF NOT EXISTS
FOR (bv:BillVersion) ON (bv.bill_id)
""")
# BillAmendmentEvent
neo4j_client.run_query("""
CREATE CONSTRAINT bill_amendment_event_id IF NOT EXISTS
FOR (bae:BillAmendmentEvent) REQUIRE bae.id IS UNIQUE
""")
neo4j_client.run_query("""
CREATE INDEX bill_amendment_event_bill_id IF NOT EXISTS
FOR (bae:BillAmendmentEvent) ON (bae.bill_id)
""")
# BillPart
neo4j_client.run_query("""
CREATE CONSTRAINT bill_part_id IF NOT EXISTS
FOR (bp:BillPart) REQUIRE bp.id IS UNIQUE
""")
neo4j_client.run_query("""
CREATE INDEX bill_part_anchor_id IF NOT EXISTS
FOR (bp:BillPart) ON (bp.anchor_id)
""")
neo4j_client.run_query("""
CREATE INDEX bill_part_bill_id IF NOT EXISTS
FOR (bp:BillPart) ON (bp.bill_id)
""")
# BillSection
neo4j_client.run_query("""
CREATE CONSTRAINT bill_section_id IF NOT EXISTS
FOR (bs:BillSection) REQUIRE bs.id IS UNIQUE
""")
neo4j_client.run_query("""
CREATE INDEX bill_section_anchor_id IF NOT EXISTS
FOR (bs:BillSection) ON (bs.anchor_id)
""")
neo4j_client.run_query("""
CREATE INDEX bill_section_bill_id IF NOT EXISTS
FOR (bs:BillSection) ON (bs.bill_id)
""")
# BillSubsection
neo4j_client.run_query("""
CREATE CONSTRAINT bill_subsection_id IF NOT EXISTS
FOR (bss:BillSubsection) REQUIRE bss.id IS UNIQUE
""")
neo4j_client.run_query("""
CREATE INDEX bill_subsection_anchor_id IF NOT EXISTS
FOR (bss:BillSubsection) ON (bss.anchor_id)
""")
neo4j_client.run_query("""
CREATE INDEX bill_subsection_section_id IF NOT EXISTS
FOR (bss:BillSubsection) ON (bss.section_id)
""")
# BillParagraph
neo4j_client.run_query("""
CREATE CONSTRAINT bill_paragraph_id IF NOT EXISTS
FOR (bpg:BillParagraph) REQUIRE bpg.id IS UNIQUE
""")
neo4j_client.run_query("""
CREATE INDEX bill_paragraph_anchor_id IF NOT EXISTS
FOR (bpg:BillParagraph) ON (bpg.anchor_id)
""")
neo4j_client.run_query("""
CREATE INDEX bill_paragraph_subsection_id IF NOT EXISTS
FOR (bpg:BillParagraph) ON (bpg.subsection_id)
""")
# BillSubparagraph
neo4j_client.run_query("""
CREATE CONSTRAINT bill_subparagraph_id IF NOT EXISTS
FOR (bsp:BillSubparagraph) REQUIRE bsp.id IS UNIQUE
""")
neo4j_client.run_query("""
CREATE INDEX bill_subparagraph_anchor_id IF NOT EXISTS
FOR (bsp:BillSubparagraph) ON (bsp.anchor_id)
""")
neo4j_client.run_query("""
CREATE INDEX bill_subparagraph_paragraph_id IF NOT EXISTS
FOR (bsp:BillSubparagraph) ON (bsp.paragraph_id)
""")
# BillDefinition
neo4j_client.run_query("""
CREATE CONSTRAINT bill_definition_id IF NOT EXISTS
FOR (bd:BillDefinition) REQUIRE bd.id IS UNIQUE
""")
neo4j_client.run_query("""
CREATE INDEX bill_definition_bill_id IF NOT EXISTS
FOR (bd:BillDefinition) ON (bd.bill_id)
""")
logger.info("✅ Bill structure schema created")
def ingest_bill_versions(
neo4j_client: Neo4jClient,
bill: ParsedBill,
) -> int:
"""Ingest bill version nodes and relationships.
Args:
neo4j_client: Neo4j client instance
bill: Parsed bill with available_versions
Returns:
Number of versions created
"""
if not bill.available_versions:
return 0
bill_id = f"{bill.session_str}:{bill.bill_number}"
now = datetime.utcnow().isoformat()
versions_data = []
for ver in bill.available_versions:
version_id = f"{bill_id}:v{ver.version_number}"
versions_data.append({
"id": version_id,
"bill_id": bill_id,
"version_number": ver.version_number,
"stage": ver.stage.value if ver.stage else "first-reading",
"publication_type_name": ver.publication_type_name,
"publication_date": ver.publication_date.isoformat() if ver.publication_date else None,
"has_amendments": ver.has_amendments,
"xml_url": ver.xml_url,
"pdf_url": ver.pdf_url,
"updated_at": now,
})
# Create BillVersion nodes
cypher = """
UNWIND $versions AS v
MERGE (bv:BillVersion {id: v.id})
SET bv.bill_id = v.bill_id,
bv.version_number = v.version_number,
bv.stage = v.stage,
bv.publication_type_name = v.publication_type_name,
bv.publication_date = CASE WHEN v.publication_date IS NOT NULL
THEN datetime(v.publication_date) ELSE NULL END,
bv.has_amendments = v.has_amendments,
bv.xml_url = v.xml_url,
bv.pdf_url = v.pdf_url,
bv.updated_at = datetime(v.updated_at)
WITH bv, v, split(v.bill_id, ':') AS parts
MATCH (b:Bill {session: parts[0], number: parts[1]})
MERGE (b)-[:HAS_VERSION]->(bv)
RETURN count(bv) as created
"""
result = neo4j_client.run_query(cypher, {"versions": versions_data})
created = result[0]["created"] if result else 0
logger.info(f" ✅ Created {created} BillVersion nodes")
return created
def ingest_amendment_events(
neo4j_client: Neo4jClient,
bill: ParsedBill,
) -> int:
"""Ingest bill amendment event nodes and relationships.
Args:
neo4j_client: Neo4j client instance
bill: Parsed bill with amendment_events
Returns:
Number of amendment events created
"""
if not bill.amendment_events:
return 0
bill_id = f"{bill.session_str}:{bill.bill_number}"
now = datetime.utcnow().isoformat()
events_data = []
for i, event in enumerate(bill.amendment_events, start=1):
event_id = f"{bill_id}:amend:{i}"
events_data.append({
"id": event_id,
"bill_id": bill_id,
"event_type": event.event_type,
"description_en": event.description_en,
"description_fr": event.description_fr,
"event_date": event.event_date.isoformat() if event.event_date else None,
"chamber": event.chamber,
"stage": event.stage,
"committee_code": event.committee_code,
"committee_name": event.committee_name,
"report_id": event.report_id,
"report_number": event.report_number,
"number_of_amendments": event.number_of_amendments,
"updated_at": now,
})
# Create BillAmendmentEvent nodes
cypher = """
UNWIND $events AS e
MERGE (bae:BillAmendmentEvent {id: e.id})
SET bae.bill_id = e.bill_id,
bae.event_type = e.event_type,
bae.description_en = e.description_en,
bae.description_fr = e.description_fr,
bae.event_date = CASE WHEN e.event_date IS NOT NULL
THEN datetime(e.event_date) ELSE NULL END,
bae.chamber = e.chamber,
bae.stage = e.stage,
bae.committee_code = e.committee_code,
bae.committee_name = e.committee_name,
bae.report_id = e.report_id,
bae.report_number = e.report_number,
bae.number_of_amendments = e.number_of_amendments,
bae.updated_at = datetime(e.updated_at)
WITH bae, e, split(e.bill_id, ':') AS parts
MATCH (b:Bill {session: parts[0], number: parts[1]})
MERGE (b)-[:HAS_AMENDMENT_EVENT]->(bae)
RETURN count(bae) as created
"""
result = neo4j_client.run_query(cypher, {"events": events_data})
created = result[0]["created"] if result else 0
logger.info(f" ✅ Created {created} BillAmendmentEvent nodes")
return created
def ingest_bill_parts(
neo4j_client: Neo4jClient,
bill: ParsedBill,
) -> int:
"""Ingest bill part nodes and relationships.
Args:
neo4j_client: Neo4j client instance
bill: Parsed bill with parts
Returns:
Number of parts created
"""
if not bill.parts:
return 0
bill_id = f"{bill.session_str}:{bill.bill_number}"
now = datetime.utcnow().isoformat()
parts_data = []
for part in bill.parts:
parts_data.append({
"id": part.id,
"bill_id": bill_id,
"number": part.number,
"title_en": part.title_en,
"title_fr": part.title_fr,
"anchor_id": part.anchor_id,
"sequence": part.sequence,
"updated_at": now,
})
# Create BillPart nodes
cypher = """
UNWIND $parts AS p
MERGE (bp:BillPart {id: p.id})
SET bp.bill_id = p.bill_id,
bp.number = p.number,
bp.title_en = p.title_en,
bp.title_fr = p.title_fr,
bp.anchor_id = p.anchor_id,
bp.sequence = p.sequence,
bp.updated_at = datetime(p.updated_at)
WITH bp, p, split(p.bill_id, ':') AS parts
MATCH (b:Bill {session: parts[0], number: parts[1]})
MERGE (b)-[:HAS_PART]->(bp)
RETURN count(bp) as created
"""
result = neo4j_client.run_query(cypher, {"parts": parts_data})
created = result[0]["created"] if result else 0
logger.info(f" ✅ Created {created} BillPart nodes")
return created
def ingest_bill_sections(
neo4j_client: Neo4jClient,
bill: ParsedBill,
) -> int:
"""Ingest bill section nodes and relationships.
Handles both:
- Sections within parts: (BillPart)-[:HAS_SECTION]->(BillSection)
- Loose sections (no part): (Bill)-[:HAS_SECTION]->(BillSection)
Args:
neo4j_client: Neo4j client instance
bill: Parsed bill with parts and sections
Returns:
Number of sections created
"""
bill_id = f"{bill.session_str}:{bill.bill_number}"
now = datetime.utcnow().isoformat()
sections_data = []
# Sections within parts
for part in bill.parts:
for section in part.sections:
sections_data.append({
"id": section.id,
"bill_id": bill_id,
"part_id": part.id,
"number": section.number,
"marginal_note_en": section.marginal_note_en,
"marginal_note_fr": section.marginal_note_fr,
"text_en": section.text_en,
"text_fr": section.text_fr,
"anchor_id": section.anchor_id,
"sequence": section.sequence,
"updated_at": now,
})
# Loose sections (not in any part)
for section in bill.sections:
sections_data.append({
"id": section.id,
"bill_id": bill_id,
"part_id": None,
"number": section.number,
"marginal_note_en": section.marginal_note_en,
"marginal_note_fr": section.marginal_note_fr,
"text_en": section.text_en,
"text_fr": section.text_fr,
"anchor_id": section.anchor_id,
"sequence": section.sequence,
"updated_at": now,
})
if not sections_data:
return 0
# Create BillSection nodes
cypher = """
UNWIND $sections AS s
MERGE (bs:BillSection {id: s.id})
SET bs.bill_id = s.bill_id,
bs.part_id = s.part_id,
bs.number = s.number,
bs.marginal_note_en = s.marginal_note_en,
bs.marginal_note_fr = s.marginal_note_fr,
bs.text_en = s.text_en,
bs.text_fr = s.text_fr,
bs.anchor_id = s.anchor_id,
bs.sequence = s.sequence,
bs.updated_at = datetime(s.updated_at)
RETURN count(bs) as created
"""
result = neo4j_client.run_query(cypher, {"sections": sections_data})
created = result[0]["created"] if result else 0
# Create relationships for sections in parts
cypher_part_rel = """
UNWIND $sections AS s
MATCH (bs:BillSection {id: s.id})
MATCH (bp:BillPart {id: s.part_id})
WHERE s.part_id IS NOT NULL
MERGE (bp)-[:HAS_SECTION]->(bs)
"""
neo4j_client.run_query(cypher_part_rel, {"sections": sections_data})
# Create relationships for loose sections (Bill -> Section)
# Note: Bill nodes don't have an 'id' property - they're matched by session + number
# bill_id format is "45-1:C-2", so we split on ':' to get session and number
cypher_bill_rel = """
UNWIND $sections AS s
MATCH (bs:BillSection {id: s.id})
WITH bs, s, split(s.bill_id, ':') AS parts
MATCH (b:Bill {session: parts[0], number: parts[1]})
WHERE s.part_id IS NULL
MERGE (b)-[:HAS_SECTION]->(bs)
"""
neo4j_client.run_query(cypher_bill_rel, {"sections": sections_data})
logger.info(f" ✅ Created {created} BillSection nodes")
return created
def ingest_bill_subsections(
neo4j_client: Neo4jClient,
bill: ParsedBill,
) -> int:
"""Ingest bill subsection nodes and relationships.
Args:
neo4j_client: Neo4j client instance
bill: Parsed bill with parts and sections
Returns:
Number of subsections created
"""
now = datetime.utcnow().isoformat()
subsections_data = []
# Collect all subsections from all sections
all_sections = list(bill.sections) # Loose sections
for part in bill.parts:
all_sections.extend(part.sections)
for section in all_sections:
for subsection in section.subsections:
subsections_data.append({
"id": subsection.id,
"section_id": section.id,
"number": subsection.number,
"text_en": subsection.text_en,
"text_fr": subsection.text_fr,
"anchor_id": subsection.anchor_id,
"sequence": subsection.sequence,
"updated_at": now,
})
if not subsections_data:
return 0
# Create BillSubsection nodes
cypher = """
UNWIND $subsections AS ss
MERGE (bss:BillSubsection {id: ss.id})
SET bss.section_id = ss.section_id,
bss.number = ss.number,
bss.text_en = ss.text_en,
bss.text_fr = ss.text_fr,
bss.anchor_id = ss.anchor_id,
bss.sequence = ss.sequence,
bss.updated_at = datetime(ss.updated_at)
WITH bss, ss
MATCH (bs:BillSection {id: ss.section_id})
MERGE (bs)-[:HAS_SUBSECTION]->(bss)
RETURN count(bss) as created
"""
result = neo4j_client.run_query(cypher, {"subsections": subsections_data})
created = result[0]["created"] if result else 0
logger.info(f" ✅ Created {created} BillSubsection nodes")
return created
def ingest_bill_paragraphs(
neo4j_client: Neo4jClient,
bill: ParsedBill,
) -> int:
"""Ingest bill paragraph nodes and relationships.
Args:
neo4j_client: Neo4j client instance
bill: Parsed bill with parts and sections
Returns:
Number of paragraphs created
"""
now = datetime.utcnow().isoformat()
paragraphs_data = []
# Collect all paragraphs from all subsections
all_sections = list(bill.sections)
for part in bill.parts:
all_sections.extend(part.sections)
for section in all_sections:
for subsection in section.subsections:
for paragraph in subsection.paragraphs:
paragraphs_data.append({
"id": paragraph.id,
"subsection_id": subsection.id,
"letter": paragraph.letter,
"text_en": paragraph.text_en,
"text_fr": paragraph.text_fr,
"anchor_id": paragraph.anchor_id,
"sequence": paragraph.sequence,
"updated_at": now,
})
if not paragraphs_data:
return 0
# Create BillParagraph nodes
cypher = """
UNWIND $paragraphs AS p
MERGE (bpg:BillParagraph {id: p.id})
SET bpg.subsection_id = p.subsection_id,
bpg.letter = p.letter,
bpg.text_en = p.text_en,
bpg.text_fr = p.text_fr,
bpg.anchor_id = p.anchor_id,
bpg.sequence = p.sequence,
bpg.updated_at = datetime(p.updated_at)
WITH bpg, p
MATCH (bss:BillSubsection {id: p.subsection_id})
MERGE (bss)-[:HAS_PARAGRAPH]->(bpg)
RETURN count(bpg) as created
"""
result = neo4j_client.run_query(cypher, {"paragraphs": paragraphs_data})
created = result[0]["created"] if result else 0
logger.info(f" ✅ Created {created} BillParagraph nodes")
return created
def ingest_bill_subparagraphs(
neo4j_client: Neo4jClient,
bill: ParsedBill,
) -> int:
"""Ingest bill subparagraph nodes and relationships.
Args:
neo4j_client: Neo4j client instance
bill: Parsed bill with parts and sections
Returns:
Number of subparagraphs created
"""
now = datetime.utcnow().isoformat()
subparagraphs_data = []
# Collect all subparagraphs from all paragraphs
all_sections = list(bill.sections)
for part in bill.parts:
all_sections.extend(part.sections)
for section in all_sections:
for subsection in section.subsections:
for paragraph in subsection.paragraphs:
for subparagraph in paragraph.subparagraphs:
subparagraphs_data.append({
"id": subparagraph.id,
"paragraph_id": paragraph.id,
"numeral": subparagraph.numeral,
"text_en": subparagraph.text_en,
"text_fr": subparagraph.text_fr,
"anchor_id": subparagraph.anchor_id,
"sequence": subparagraph.sequence,
"updated_at": now,
})
if not subparagraphs_data:
return 0
# Create BillSubparagraph nodes
cypher = """
UNWIND $subparagraphs AS sp
MERGE (bsp:BillSubparagraph {id: sp.id})
SET bsp.paragraph_id = sp.paragraph_id,
bsp.numeral = sp.numeral,
bsp.text_en = sp.text_en,
bsp.text_fr = sp.text_fr,
bsp.anchor_id = sp.anchor_id,
bsp.sequence = sp.sequence,
bsp.updated_at = datetime(sp.updated_at)
WITH bsp, sp
MATCH (bpg:BillParagraph {id: sp.paragraph_id})
MERGE (bpg)-[:HAS_SUBPARAGRAPH]->(bsp)
RETURN count(bsp) as created
"""
result = neo4j_client.run_query(cypher, {"subparagraphs": subparagraphs_data})
created = result[0]["created"] if result else 0
logger.info(f" ✅ Created {created} BillSubparagraph nodes")
return created
def ingest_bill_definitions(
neo4j_client: Neo4jClient,
bill: ParsedBill,
) -> int:
"""Ingest bill definition nodes and relationships.
Args:
neo4j_client: Neo4j client instance
bill: Parsed bill with definitions
Returns:
Number of definitions created
"""
if not bill.definitions:
return 0
bill_id = f"{bill.session_str}:{bill.bill_number}"
now = datetime.utcnow().isoformat()
definitions_data = []
for i, defn in enumerate(bill.definitions, start=1):
def_id = f"{bill_id}:def:{i}"
definitions_data.append({
"id": def_id,
"bill_id": bill_id,
"term_en": defn.term_en,
"term_fr": defn.term_fr,
"definition_en": defn.definition_en,
"definition_fr": defn.definition_fr,
"sequence": i,
"updated_at": now,
})
# Create BillDefinition nodes
cypher = """
UNWIND $definitions AS d
MERGE (bd:BillDefinition {id: d.id})
SET bd.bill_id = d.bill_id,
bd.term_en = d.term_en,
bd.term_fr = d.term_fr,
bd.definition_en = d.definition_en,
bd.definition_fr = d.definition_fr,
bd.sequence = d.sequence,
bd.updated_at = datetime(d.updated_at)
WITH bd, d, split(d.bill_id, ':') AS parts
MATCH (b:Bill {session: parts[0], number: parts[1]})
MERGE (b)-[:HAS_DEFINITION]->(bd)
RETURN count(bd) as created
"""
result = neo4j_client.run_query(cypher, {"definitions": definitions_data})
created = result[0]["created"] if result else 0
logger.info(f" ✅ Created {created} BillDefinition nodes")
return created
def ingest_bill_full_text(
neo4j_client: Neo4jClient,
bill: ParsedBill,
language: str = 'en',
) -> bool:
"""Extract and store full narrative text for a bill.
Args:
neo4j_client: Neo4j client instance
bill: Parsed bill structure
language: 'en' for English or 'fr' for French
Returns:
True if successful, False otherwise
"""
try:
# Extract continuous text
logger.info(f" Extracting full text ({language})...")
full_text = extract_continuous_text(bill, language=language)
# Validate quality
if not validate_extracted_text(full_text, bill):
logger.warning(f" ⚠️ Text validation failed for {language}")
return False
logger.info(f" Extracted {len(full_text):,} characters")
# Update Bill node
field_name = f"full_text_{language}"
cypher = f"""
MATCH (b:Bill {{session: $session, number: $number}})
SET b.{field_name} = $full_text,
b.full_text_updated_at = datetime()
RETURN b.number as updated
"""
result = neo4j_client.run_query(cypher, {
"session": bill.session_str,
"number": bill.bill_number,
"full_text": full_text
})
if result:
logger.info(f" ✅ Stored full text ({language}) for {bill.bill_number}")
return True
else:
logger.warning(f" ⚠️ Failed to update Bill node with full text ({language})")
return False
except Exception as e:
logger.error(f" ❌ Failed to extract full text ({language}): {e}")
return False
def ingest_bill_structure(
neo4j_client: Neo4jClient,
parliament: int,
session: int,
bill_number: str,
*,
version: int = 1,
is_government: bool = False,
include_all_versions: bool = True,
include_full_text: bool = True,
) -> Dict[str, Any]:
"""Ingest complete bill structure from Parliament.ca XML.
This is the main entry point for ingesting a single bill's structure.
Args:
neo4j_client: Neo4j client instance
parliament: Parliament number (e.g., 44)
session: Session number (e.g., 1)
bill_number: Bill code (e.g., "C-2")
version: Version number to parse (1=first reading, etc.)
is_government: True for government bills
include_all_versions: If True, fetch and store all available versions
include_full_text: If True, extract and store full narrative text (en + fr)
Returns:
Dictionary with counts for each node type created and full text extraction status
"""
session_str = f"{parliament}-{session}"
bill_id = f"{session_str}:{bill_number.upper()}"
logger.info(f"Ingesting bill structure for {bill_id}...")
# First check if bill exists in Neo4j (match by number and session, not id)
check_result = neo4j_client.run_query(
"MATCH (b:Bill {number: $number, session: $session}) RETURN b.number",
{"number": bill_number.upper(), "session": session_str}
)
if not check_result:
logger.warning(f" Bill {bill_id} not found in Neo4j, skipping structure ingestion")
return {"error": f"Bill {bill_id} not found"}
# Fetch and parse bill with history
client = BillTextXMLClient()
try:
bill = client.parse_bill_with_history(
parliament=parliament,
session=session,
bill_number=bill_number,
version=version,
is_government=is_government,
include_all_versions=include_all_versions,
)
except Exception as e:
logger.error(f" Failed to fetch/parse bill {bill_id}: {e}")
return {"error": str(e)}
results = {
"bill": bill_id,
"versions": 0,
"amendment_events": 0,
"parts": 0,
"sections": 0,
"subsections": 0,
"paragraphs": 0,
"subparagraphs": 0,
"definitions": 0,
}
# Ingest all structure components
results["versions"] = ingest_bill_versions(neo4j_client, bill)
results["amendment_events"] = ingest_amendment_events(neo4j_client, bill)
results["parts"] = ingest_bill_parts(neo4j_client, bill)
results["sections"] = ingest_bill_sections(neo4j_client, bill)
results["subsections"] = ingest_bill_subsections(neo4j_client, bill)
results["paragraphs"] = ingest_bill_paragraphs(neo4j_client, bill)
results["subparagraphs"] = ingest_bill_subparagraphs(neo4j_client, bill)
results["definitions"] = ingest_bill_definitions(neo4j_client, bill)
# Extract and store full narrative text (if requested)
if include_full_text:
logger.info(f"Extracting full narrative text...")
# English text
success_en = ingest_bill_full_text(neo4j_client, bill, language='en')
results['full_text_en_extracted'] = success_en
# French text (fetch French XML separately)
try:
logger.info(f" Fetching French version...")
bill_fr = client.parse_bill(
parliament=parliament,
session=session,
bill_number=bill_number,
version=version,
is_government=is_government,
language='F', # French XML
)
success_fr = ingest_bill_full_text(neo4j_client, bill_fr, language='fr')
results['full_text_fr_extracted'] = success_fr
except Exception as e:
logger.warning(f" ⚠️ French text extraction failed: {e}")
results['full_text_fr_extracted'] = False
else:
results['full_text_en_extracted'] = None
results['full_text_fr_extracted'] = None
total = sum(v for k, v in results.items() if k != "bill" and k != "error" and isinstance(v, int))
logger.info(f"✅ Ingested {total} total nodes for {bill_id}")
return results
def ingest_bills_from_list(
neo4j_client: Neo4jClient,
bills: List[Tuple[str, str]],
*,
default_version: int = 1,
) -> Dict[str, Any]:
"""Ingest multiple bills from a list of (session, bill_number) tuples.
Args:
neo4j_client: Neo4j client instance
bills: List of (session, bill_number) tuples, e.g., [("44-1", "C-2"), ("45-1", "C-234")]
default_version: Default version to parse if not specified
Returns:
Dictionary with overall statistics and per-bill results
"""
logger.info(f"Ingesting structure for {len(bills)} bills...")
progress = ProgressTracker(
total=len(bills),
desc="Ingesting bill structures",
unit="bills"
)
results = {
"total_bills": len(bills),
"successful": 0,
"failed": 0,
"totals": {
"versions": 0,
"amendment_events": 0,
"parts": 0,
"sections": 0,
"subsections": 0,
"paragraphs": 0,
"subparagraphs": 0,
"definitions": 0,
},
"per_bill": [],
}
for session_str, bill_number in bills:
# Parse session string (e.g., "44-1" -> parliament=44, session=1)
parts = session_str.split("-")
if len(parts) != 2:
logger.warning(f"Invalid session format: {session_str}, skipping {bill_number}")
results["failed"] += 1
progress.update(1)
continue
parliament = int(parts[0])
session = int(parts[1])
# Check if government bill from database
bill_info = neo4j_client.run_query(
"MATCH (b:Bill {number: $number, session: $session}) RETURN b.is_government_bill as is_gov",
{"number": bill_number.upper(), "session": session_str}
)
is_government = bill_info[0]["is_gov"] if bill_info and bill_info[0].get("is_gov") is not None else False
bill_result = ingest_bill_structure(
neo4j_client,
parliament=parliament,
session=session,
bill_number=bill_number,
version=default_version,
is_government=is_government,
)
if "error" in bill_result:
results["failed"] += 1
else:
results["successful"] += 1
for key in results["totals"]:
results["totals"][key] += bill_result.get(key, 0)
results["per_bill"].append(bill_result)
progress.update(1)
progress.close()
logger.info("=" * 60)
logger.info(f"✅ Bill structure ingestion complete:")
logger.info(f" Successful: {results['successful']}/{results['total_bills']}")
logger.info(f" Failed: {results['failed']}/{results['total_bills']}")
logger.info(f" Total nodes created: {sum(results['totals'].values())}")
logger.info("=" * 60)
return results
def ingest_all_bills_in_session(
neo4j_client: Neo4jClient,
parliament: int,
session: int,
*,
limit: Optional[int] = None,
) -> Dict[str, Any]:
"""Ingest structure for all bills in a parliamentary session.
Fetches all bills from the session in Neo4j and ingests their structure.
Args:
neo4j_client: Neo4j client instance
parliament: Parliament number (e.g., 44)
session: Session number (e.g., 1)
limit: Optional limit on number of bills to process
Returns:
Dictionary with overall statistics
"""
session_str = f"{parliament}-{session}"
logger.info(f"Fetching all bills from session {session_str}...")
# Get all bills in this session from Neo4j
query = """
MATCH (b:Bill)
WHERE b.parliament_session = $session_str
RETURN b.id AS id, b.number AS number
ORDER BY b.number
"""
if limit:
query += f" LIMIT {limit}"
results = neo4j_client.run_query(query, {"session_str": session_str})
if not results:
logger.warning(f"No bills found in session {session_str}")
return {"error": f"No bills found in session {session_str}"}
bills = [(session_str, r["number"]) for r in results if r.get("number")]
logger.info(f"Found {len(bills)} bills in session {session_str}")
return ingest_bills_from_list(neo4j_client, bills)
def run_bill_structure_ingestion(
neo4j_client: Neo4jClient,
*,
session_str: Optional[str] = None,
bill_numbers: Optional[List[str]] = None,
limit: Optional[int] = None,
) -> Dict[str, Any]:
"""Main entry point for bill structure ingestion.
Can be called with:
- session_str only: Ingest all bills in that session
- session_str + bill_numbers: Ingest specific bills in that session
- bill_numbers only with session in format "44-1:C-2": Ingest specific bills
Args:
neo4j_client: Neo4j client instance
session_str: Parliamentary session (e.g., "44-1")
bill_numbers: List of bill numbers (e.g., ["C-2", "C-3"])
limit: Optional limit on number of bills
Returns:
Dictionary with results
"""
logger.info("=" * 60)
logger.info("BILL STRUCTURE INGESTION")
logger.info("=" * 60)
# Create schema
create_bill_structure_schema(neo4j_client)
if session_str and bill_numbers:
# Specific bills in a session
bills = [(session_str, bn) for bn in bill_numbers]
return ingest_bills_from_list(neo4j_client, bills)
elif session_str:
# All bills in a session
parts = session_str.split("-")
if len(parts) != 2:
return {"error": f"Invalid session format: {session_str}"}
return ingest_all_bills_in_session(
neo4j_client,
parliament=int(parts[0]),
session=int(parts[1]),
limit=limit,
)
elif bill_numbers:
# Bill numbers with embedded session (e.g., "44-1:C-2")
bills = []
for bn in bill_numbers:
if ":" in bn:
sess, num = bn.split(":", 1)
bills.append((sess, num))
else:
logger.warning(f"Bill {bn} missing session, skipping")
return ingest_bills_from_list(neo4j_client, bills)
else:
return {"error": "Must specify session_str and/or bill_numbers"}