Skip to main content
Glama
northernvariables

FedMCP - Federal Parliamentary Information

enrich_bills.py14.2 kB
#!/usr/bin/env python3 """ Enrich bill data with missing fields from LEGISinfo individual bill API. This script fetches detailed bill information from LEGISinfo and populates: - summary: Legislative summary text - bill_type: Bill document type (e.g., "Government Bill", "Private Member's Bill") - is_government_bill: Boolean flag - is_private_member_bill: Boolean flag - originating_chamber: House or Senate - latest_event: Most recent bill event - is_proforma: Pro forma bill flag - statute_year: Year if passed into law - statute_chapter: Chapter number if passed into law Committee relationships are also created if available in bill stages. Usage: python scripts/enrich_bills.py [--limit N] [--dry-run] Options: --limit N: Only process first N bills (for testing) --dry-run: Show what would be updated without making changes """ import sys import os import time import argparse from pathlib import Path from datetime import datetime from typing import Dict, Any, List, Optional # Add packages to path SCRIPT_DIR = Path(__file__).parent PROJECT_ROOT = SCRIPT_DIR.parent sys.path.insert(0, str(PROJECT_ROOT / "src")) sys.path.insert(0, str(PROJECT_ROOT / "packages" / "data-pipeline")) from dotenv import load_dotenv from fedmcp.clients.legisinfo import LegisInfoClient from fedmcp_pipeline.utils.neo4j_client import Neo4jClient class BillEnricher: """Enriches bill data from LEGISinfo API.""" def __init__(self, neo4j_client: Neo4jClient, dry_run: bool = False): self.neo4j = neo4j_client self.legis = LegisInfoClient() self.dry_run = dry_run self.stats = { "total_bills": 0, "enriched": 0, "already_complete": 0, "errors": 0, "committee_relationships": 0, } def get_bills_needing_enrichment(self, limit: Optional[int] = None) -> List[Dict[str, Any]]: """Get bills from database that need enrichment (missing summary or bill_type).""" query = """ MATCH (b:Bill) WHERE b.summary IS NULL OR b.bill_type IS NULL RETURN b.number AS number, b.session AS session ORDER BY b.introduced_date DESC """ if limit: query += f" LIMIT {limit}" results = self.neo4j.run_query(query) print(f"\n📊 Found {len(results):,} bills needing enrichment") return results def enrich_bill(self, bill_number: str, session: str) -> bool: """ Fetch detailed bill data and update the database. Returns: True if bill was enriched, False if skipped or error """ try: # Fetch detailed bill data from LEGISinfo bill_data = self.legis.get_bill(session, bill_number) # Handle case where API returns a list instead of dict if isinstance(bill_data, list): if not bill_data: print(f" ⚠️ Empty list returned for {bill_number}") self.stats["errors"] += 1 return False # Take the first bill if multiple returned (likely different versions) bill_data = bill_data[0] # Validate we have a dict if not bill_data or not isinstance(bill_data, dict): print(f" ⚠️ Invalid data type for {bill_number}: {type(bill_data)}") self.stats["errors"] += 1 return False # Extract enrichment fields updates = self._extract_enrichment_fields(bill_data) if not updates: print(f" ✓ {bill_number} already complete") self.stats["already_complete"] += 1 return False # Update database if self.dry_run: print(f" [DRY RUN] Would update {bill_number} with: {list(updates.keys())}") else: self._update_bill(bill_number, session, updates) # Extract and create committee relationships committees = self._extract_committees(bill_data) if committees and not self.dry_run: self._create_committee_relationships(bill_number, session, committees) elif committees: print(f" [DRY RUN] Would create {len(committees)} committee relationships") self.stats["enriched"] += 1 return True except ValueError as e: # JSON decode errors - older bills may not have data available if "Expecting value" in str(e) or "JSON" in str(e): print(f" ⚠️ No JSON data available for {bill_number} (likely too old)") self.stats["errors"] += 1 return False else: print(f" ❌ ValueError enriching {bill_number}: {e}") self.stats["errors"] += 1 return False except Exception as e: print(f" ❌ Error enriching {bill_number}: {e}") self.stats["errors"] += 1 return False def _extract_enrichment_fields(self, bill_data: Dict[str, Any]) -> Dict[str, Any]: """Extract fields to enrich from bill detail JSON.""" updates = {} # Summary summary = bill_data.get("ShortLegislativeSummaryEn") if summary: updates["summary"] = summary updates["summary_fr"] = bill_data.get("ShortLegislativeSummaryFr") updates["full_summary_available"] = bill_data.get("IsFullLegislativeSummaryAvailable", False) # Bill type classification bill_type = bill_data.get("BillDocumentTypeNameEn") if bill_type: updates["bill_type"] = bill_type updates["bill_type_fr"] = bill_data.get("BillDocumentTypeNameFr") # Government vs private member is_gov = bill_data.get("IsGovernmentBill") if is_gov is not None: updates["is_government_bill"] = is_gov updates["is_private_member_bill"] = not is_gov # Originating chamber chamber = bill_data.get("OriginatingChamberNameEn") if chamber: updates["originating_chamber"] = chamber updates["originating_chamber_fr"] = bill_data.get("OriginatingChamberNameFr") # Latest event latest_event = bill_data.get("LatestBillEventTypeName") if latest_event: updates["latest_event"] = latest_event # Pro forma flag is_proforma = bill_data.get("IsProForma") if is_proforma is not None: updates["is_proforma"] = is_proforma bill_form = bill_data.get("BillFormName") if bill_form: updates["bill_form"] = bill_form # Statute info (if passed into law) statute_year = bill_data.get("StatuteYear") if statute_year: updates["statute_year"] = statute_year statute_chapter = bill_data.get("StatuteChapter") if statute_chapter: updates["statute_chapter"] = statute_chapter # Reinstated info reinstated = bill_data.get("DidReinstateFromPreviousSession") if reinstated is not None: updates["reinstated_from_previous"] = reinstated reinstated_from = bill_data.get("ReinstatedFromBillNumberCode") if reinstated_from: updates["reinstated_from_bill"] = reinstated_from return updates def _extract_committees(self, bill_data: Dict[str, Any]) -> List[Dict[str, str]]: """Extract committee referrals from bill stages.""" committees = [] # Check House stages bill_stages = bill_data.get("BillStages", {}) house_stages = bill_stages.get("HouseBillStages", []) for stage in house_stages: committee_data = stage.get("Committee") if committee_data: committees.append({ "code": committee_data.get("CommitteeAcronym", ""), "name": committee_data.get("CommitteeNameEn", ""), "stage": stage.get("BillStageName", ""), }) # Check Senate stages senate_stages = bill_stages.get("SenateBillStages", []) for stage in senate_stages: committee_data = stage.get("Committee") if committee_data: committees.append({ "code": committee_data.get("CommitteeAcronym", ""), "name": committee_data.get("CommitteeNameEn", ""), "stage": stage.get("BillStageName", ""), }) return committees def _update_bill(self, bill_number: str, session: str, updates: Dict[str, Any]): """Update bill in Neo4j with enrichment fields.""" # Build SET clause set_parts = [f"b.{key} = ${key}" for key in updates.keys()] set_parts.append("b.updated_at = $updated_at") set_clause = ", ".join(set_parts) query = f""" MATCH (b:Bill {{number: $number, session: $session}}) SET {set_clause} RETURN b """ params = { "number": bill_number, "session": session, "updated_at": datetime.utcnow().isoformat(), **updates } self.neo4j.run_query(query, params) print(f" ✅ Enriched {bill_number} with {len(updates)} fields") def _create_committee_relationships( self, bill_number: str, session: str, committees: List[Dict[str, str]] ): """Create REFERRED_TO relationships between bills and committees.""" for committee in committees: if not committee.get("code"): continue # Infer chamber from committee name name = committee.get("name", "") if "Senate" in name: chamber = "Senate" elif "Committee of the Whole" in name: chamber = "House" else: chamber = "House" query = """ MATCH (b:Bill {number: $number, session: $session}) MERGE (c:Committee {code: $code}) ON CREATE SET c.name = $name, c.chamber = $chamber, c.created_at = datetime() MERGE (b)-[r:REFERRED_TO]->(c) ON CREATE SET r.stage = $stage, r.created_at = datetime() """ params = { "number": bill_number, "session": session, "code": committee["code"], "name": committee["name"], "chamber": chamber, "stage": committee["stage"], } self.neo4j.run_query(query, params) self.stats["committee_relationships"] += 1 def run(self, limit: Optional[int] = None): """Run the enrichment process.""" print("=" * 80) print("BILLS DATA ENRICHMENT") print("=" * 80) if self.dry_run: print("⚠️ DRY RUN MODE - No changes will be made") # Get bills needing enrichment bills = self.get_bills_needing_enrichment(limit=limit) self.stats["total_bills"] = len(bills) if not bills: print("\n✅ All bills already enriched!") return print(f"\n🔄 Enriching {len(bills):,} bills...") print("") # Process each bill start_time = time.time() for i, bill in enumerate(bills, 1): if i % 10 == 0: elapsed = time.time() - start_time rate = i / elapsed if elapsed > 0 else 0 remaining = (len(bills) - i) / rate if rate > 0 else 0 print(f"\n📊 Progress: {i}/{len(bills)} ({i/len(bills)*100:.1f}%) | " f"Rate: {rate:.1f} bills/sec | ETA: {remaining/60:.1f}min") bill_number = bill["number"] session = bill["session"] print(f"{i:4d}. Enriching {session}/{bill_number}...", end="") self.enrich_bill(bill_number, session) # Be respectful of LEGISinfo (no explicit rate limit, but don't hammer) time.sleep(0.1) # Print summary elapsed = time.time() - start_time print("\n" + "=" * 80) print("ENRICHMENT COMPLETE") print("=" * 80) print(f"Total bills processed: {self.stats['total_bills']:,}") print(f"Bills enriched: {self.stats['enriched']:,}") print(f"Already complete: {self.stats['already_complete']:,}") print(f"Errors: {self.stats['errors']:,}") print(f"Committee relationships: {self.stats['committee_relationships']:,}") print(f"Time elapsed: {elapsed/60:.1f} minutes") print(f"Rate: {self.stats['total_bills']/elapsed:.1f} bills/sec") print("=" * 80) def main(): """Main entry point.""" parser = argparse.ArgumentParser(description="Enrich bills with detailed LEGISinfo data") parser.add_argument("--limit", type=int, help="Only process first N bills (for testing)") parser.add_argument("--dry-run", action="store_true", help="Show what would be updated without making changes") parser.add_argument("--env-file", default=None, help="Path to .env file (default: packages/data-pipeline/.env)") args = parser.parse_args() # Load environment env_file = args.env_file or PROJECT_ROOT / "packages" / "data-pipeline" / ".env" if Path(env_file).exists(): load_dotenv(env_file) print(f"✅ Loaded environment from {env_file}") else: print(f"⚠️ No .env file found at {env_file}, using system environment") # Connect to Neo4j neo4j_client = Neo4jClient( uri=os.getenv("NEO4J_URI", "bolt://localhost:7687"), user=os.getenv("NEO4J_USER", "neo4j"), password=os.getenv("NEO4J_PASSWORD", "password") ) # Run enrichment enricher = BillEnricher(neo4j_client, dry_run=args.dry_run) enricher.run(limit=args.limit) # Cleanup neo4j_client.close() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/northernvariables/FedMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server