Skip to main content
Glama
northernvariables

FedMCP - Federal Parliamentary Information

bill_text.py9.82 kB
"""Bill text ingestion from PostgreSQL to Neo4j. This module imports bill text data from the OpenParliament PostgreSQL database into the Neo4j graph database. Data Source: bills_billtext table (5,280 records) - Full text of bills in English and French - Bill summaries - Links to bills_bill table Neo4j Schema: Nodes: - BillText: Full text and summaries of bills Relationships: - (Bill)-[:HAS_TEXT]->(BillText): Links bills to their full text Indexes: - BillText(id): Unique constraint - BillText(docid): Index for lookups - Full-text index on text_en, text_fr, summary_en Example: >>> from fedmcp_pipeline.utils.neo4j_client import Neo4jClient >>> from fedmcp_pipeline.utils.postgres_client import PostgresClient >>> >>> neo4j = Neo4jClient(uri="bolt://localhost:7687", user="neo4j", password="password") >>> postgres = PostgresClient(dbname="openparliament", user="fedmcp", password="password") >>> >>> # Sample import (10 bill texts) >>> results = ingest_bill_text_sample(neo4j, postgres, text_limit=10) >>> >>> # Full import (all 5,280 bill texts) >>> results = ingest_bill_text_full(neo4j, postgres) """ from typing import Optional, Dict, Any, List from ..utils.neo4j_client import Neo4jClient from ..utils.postgres_client import PostgresClient from ..utils.progress import ProgressTracker, logger def create_bill_text_schema(neo4j_client: Neo4jClient) -> None: """ Create Neo4j schema for bill texts. Creates: - Unique constraint on BillText.id - Index on BillText.docid - Full-text index on text_en - Full-text index on text_fr - Full-text index on summary_en Args: neo4j_client: Neo4j client instance """ logger.info("Creating bill text schema...") # Create unique constraint on id neo4j_client.run_query(""" CREATE CONSTRAINT bill_text_id IF NOT EXISTS FOR (bt:BillText) REQUIRE bt.id IS UNIQUE """) # Create index on docid neo4j_client.run_query(""" CREATE INDEX bill_text_docid IF NOT EXISTS FOR (bt:BillText) ON (bt.docid) """) # Create full-text indexes try: neo4j_client.run_query(""" CREATE FULLTEXT INDEX bill_text_en IF NOT EXISTS FOR (bt:BillText) ON EACH [bt.text_en] """) except Exception as e: logger.warning(f"Full-text index bill_text_en may already exist: {e}") try: neo4j_client.run_query(""" CREATE FULLTEXT INDEX bill_text_fr IF NOT EXISTS FOR (bt:BillText) ON EACH [bt.text_fr] """) except Exception as e: logger.warning(f"Full-text index bill_text_fr may already exist: {e}") try: neo4j_client.run_query(""" CREATE FULLTEXT INDEX bill_text_summary IF NOT EXISTS FOR (bt:BillText) ON EACH [bt.summary_en] """) except Exception as e: logger.warning(f"Full-text index bill_text_summary may already exist: {e}") logger.info("✅ Bill text schema created") def ingest_bill_texts( neo4j_client: Neo4jClient, postgres_client: PostgresClient, batch_size: int = 1000, limit: Optional[int] = None ) -> int: """ Ingest bill texts from PostgreSQL to Neo4j. Creates BillText nodes with properties: - id: Primary key from PostgreSQL - bill_id: Foreign key to bills_bill - docid: Document ID - text_en: Full text in English - text_fr: Full text in French - summary_en: Summary in English - created: Timestamp when text was created Args: neo4j_client: Neo4j client instance postgres_client: PostgreSQL client instance batch_size: Number of records to process per batch (default: 1000) limit: Optional limit on total records to import Returns: Number of BillText nodes created """ logger.info("Ingesting bill texts from PostgreSQL...") # Fetch bill texts query = """ SELECT id, bill_id, docid, created, text_en, text_fr, summary_en FROM bills_billtext ORDER BY id """ if limit: query += f" LIMIT {limit}" bill_texts = postgres_client.execute_query(query) logger.info(f"Fetched {len(bill_texts):,} bill texts from PostgreSQL") if not bill_texts: logger.warning("No bill texts found in PostgreSQL") return 0 # Process in batches progress = ProgressTracker( total=len(bill_texts), desc="Creating BillText nodes", unit="texts" ) total_created = 0 for i in range(0, len(bill_texts), batch_size): batch = bill_texts[i:i + batch_size] # Convert timestamps to ISO format for text in batch: if text.get("created"): text["created"] = text["created"].isoformat() # Create BillText nodes cypher = """ UNWIND $bill_texts AS bt MERGE (text:BillText {id: bt.id}) SET text.bill_id = bt.bill_id, text.docid = bt.docid, text.created = datetime(bt.created), text.text_en = bt.text_en, text.text_fr = bt.text_fr, text.summary_en = bt.summary_en RETURN count(text) as created """ result = neo4j_client.run_query(cypher, {"bill_texts": batch}) created = result[0]["created"] if result else 0 total_created += created progress.update(len(batch)) progress.close() logger.info(f"✅ Created {total_created:,} BillText nodes") return total_created def link_texts_to_bills( neo4j_client: Neo4jClient, batch_size: int = 5000 ) -> int: """ Create HAS_TEXT relationships between Bills and BillTexts. Matches bills by their PostgreSQL bill_id stored on BillText nodes. Args: neo4j_client: Neo4j client instance batch_size: Number of relationships to create per batch Returns: Number of HAS_TEXT relationships created """ logger.info("Creating HAS_TEXT relationships...") # Get count of bill texts count_result = neo4j_client.run_query(""" MATCH (bt:BillText) RETURN count(bt) as total """) total = count_result[0]["total"] if count_result else 0 if total == 0: logger.warning("No BillText nodes found") return 0 logger.info(f"Found {total:,} BillText nodes to link") # Create relationships in batches cypher = """ MATCH (bt:BillText) WHERE bt.bill_id IS NOT NULL WITH bt LIMIT $batch_size MATCH (b:Bill) WHERE b.postgres_id = bt.bill_id MERGE (b)-[r:HAS_TEXT]->(bt) RETURN count(r) as created """ progress = ProgressTracker( total=total, desc="Creating HAS_TEXT relationships", unit="links" ) total_created = 0 processed = 0 while processed < total: result = neo4j_client.run_query(cypher, {"batch_size": batch_size}) created = result[0]["created"] if result else 0 if created == 0: break # No more relationships to create total_created += created processed += created progress.update(created) progress.close() logger.info(f"✅ Created {total_created:,} HAS_TEXT relationships") return total_created def ingest_bill_text_sample( neo4j_client: Neo4jClient, postgres_client: PostgresClient, text_limit: int = 10 ) -> Dict[str, int]: """ Import a sample of bill texts for testing. This function imports a small subset of bill texts (default: 10) to validate the import process before running the full import. Args: neo4j_client: Neo4j client instance postgres_client: PostgreSQL client instance text_limit: Number of bill texts to import (default: 10) Returns: Dictionary with counts: - bill_texts: Number of BillText nodes created - has_text_links: Number of HAS_TEXT relationships created """ logger.info("=" * 80) logger.info(f"BILL TEXT SAMPLE IMPORT ({text_limit} texts)") logger.info("=" * 80) results = {} # Create schema create_bill_text_schema(neo4j_client) # Import bill texts results["bill_texts"] = ingest_bill_texts( neo4j_client, postgres_client, limit=text_limit ) # Link to bills results["has_text_links"] = link_texts_to_bills(neo4j_client) logger.info("=" * 80) logger.info("✅ BILL TEXT SAMPLE IMPORT COMPLETE") logger.info("=" * 80) return results def ingest_bill_text_full( neo4j_client: Neo4jClient, postgres_client: PostgresClient ) -> Dict[str, int]: """ Import all bill texts from PostgreSQL to Neo4j. This function imports all 5,280 bill texts from the OpenParliament database. Args: neo4j_client: Neo4j client instance postgres_client: PostgreSQL client instance Returns: Dictionary with counts: - bill_texts: Number of BillText nodes created - has_text_links: Number of HAS_TEXT relationships created """ logger.info("=" * 80) logger.info("BILL TEXT FULL IMPORT (5,280 texts)") logger.info("=" * 80) results = {} # Create schema create_bill_text_schema(neo4j_client) # Import all bill texts results["bill_texts"] = ingest_bill_texts( neo4j_client, postgres_client, limit=None ) # Link to bills results["has_text_links"] = link_texts_to_bills(neo4j_client) logger.info("=" * 80) logger.info("✅ BILL TEXT FULL IMPORT COMPLETE") logger.info("=" * 80) return results

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/northernvariables/FedMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server