Skip to main content
Glama
northernvariables

FedMCP - Federal Parliamentary Information

backfill_spoke_at.py8.46 kB
#!/usr/bin/env python3 """ Backfill SPOKE_AT relationships for existing statements and committee testimonies. This script creates direct MP → Document/CommitteeEvidence relationships to enable efficient queries like "which MPs spoke in this debate?" The script is idempotent and can be run multiple times safely. """ import sys from pathlib import Path # Add fedmcp_pipeline to path sys.path.insert(0, str(Path(__file__).parent.parent)) from fedmcp_pipeline.utils.neo4j_client import Neo4jClient from fedmcp_pipeline.utils.progress import logger, ProgressTracker def backfill_hansard_spoke_at(neo4j: Neo4jClient, batch_size: int = 10000) -> int: """ Create SPOKE_AT relationships for Hansard statements. Creates (MP)-[:SPOKE_AT]->(Document) relationships by traversing: (Statement)-[:MADE_BY]->(MP) and (Statement)-[:PART_OF]->(Document) Args: neo4j: Neo4j client instance batch_size: Number of relationships to create per batch Returns: Total number of SPOKE_AT relationships created """ logger.info("Backfilling SPOKE_AT relationships for Hansard statements...") # Count total statements that need SPOKE_AT # Check if SPOKE_AT with this specific statement_id already exists count_query = """ MATCH (s:Statement)-[:MADE_BY]->(mp:MP) MATCH (s)-[:PART_OF]->(d:Document) WHERE NOT exists { MATCH (mp)-[r:SPOKE_AT]->(d) WHERE r.statement_id = s.id } RETURN count(*) as total """ result = neo4j.run_query(count_query) total = result[0]['total'] if result else 0 logger.info(f"Found {total:,} statements needing SPOKE_AT relationships") if total == 0: logger.success("✅ All Hansard SPOKE_AT relationships already exist") return 0 # Create SPOKE_AT relationships in batches # Creates ONE relationship per statement (multiple per MP-Document pair) create_query = """ MATCH (s:Statement)-[:MADE_BY]->(mp:MP) MATCH (s)-[:PART_OF]->(d:Document) WHERE NOT exists { MATCH (mp)-[r:SPOKE_AT]->(d) WHERE r.statement_id = s.id } WITH s, mp, d LIMIT $batch_size CREATE (mp)-[r:SPOKE_AT]->(d) SET r.timestamp = s.time, r.statement_id = s.id, r.intervention_id = s.intervention_id, r.person_db_id = s.person_db_id RETURN count(*) as created """ total_created = 0 tracker = ProgressTracker(total, "SPOKE_AT relationships") while True: result = neo4j.run_query(create_query, {"batch_size": batch_size}) created = result[0]['created'] if result else 0 if created == 0: break total_created += created tracker.update(total_created) logger.success(f"✅ Created {total_created:,} Hansard SPOKE_AT relationships") return total_created def backfill_committee_spoke_at(neo4j: Neo4jClient, batch_size: int = 10000) -> int: """ Create SPOKE_AT relationships for committee testimonies. Creates (MP)-[:SPOKE_AT]->(CommitteeEvidence) relationships by traversing: (CommitteeTestimony)-[:TESTIFIED_BY]->(MP) and (CommitteeTestimony)-[:GIVEN_IN]->(CommitteeEvidence) Args: neo4j: Neo4j client instance batch_size: Number of relationships to create per batch Returns: Total number of SPOKE_AT relationships created """ logger.info("Backfilling SPOKE_AT relationships for committee testimonies...") # Count total testimonies that need SPOKE_AT # Check if SPOKE_AT with this specific testimony_id already exists count_query = """ MATCH (ct:CommitteeTestimony)-[:TESTIFIED_BY]->(mp:MP) MATCH (ct)-[:GIVEN_IN]->(ce:CommitteeEvidence) WHERE NOT exists { MATCH (mp)-[r:SPOKE_AT]->(ce) WHERE r.testimony_id = ct.id } RETURN count(*) as total """ result = neo4j.run_query(count_query) total = result[0]['total'] if result else 0 logger.info(f"Found {total:,} testimonies needing SPOKE_AT relationships") if total == 0: logger.success("✅ All committee SPOKE_AT relationships already exist") return 0 # Create SPOKE_AT relationships in batches # Creates ONE relationship per testimony (multiple per MP-CommitteeEvidence pair) create_query = """ MATCH (ct:CommitteeTestimony)-[:TESTIFIED_BY]->(mp:MP) MATCH (ct)-[:GIVEN_IN]->(ce:CommitteeEvidence) WHERE NOT exists { MATCH (mp)-[r:SPOKE_AT]->(ce) WHERE r.testimony_id = ct.id } WITH ct, mp, ce LIMIT $batch_size CREATE (mp)-[r:SPOKE_AT]->(ce) SET r.testimony_id = ct.id, r.intervention_id = ct.intervention_id, r.person_db_id = ct.person_db_id, r.timestamp_hour = ct.timestamp_hour, r.timestamp_minute = ct.timestamp_minute RETURN count(*) as created """ total_created = 0 tracker = ProgressTracker(total, "committee SPOKE_AT relationships") while True: result = neo4j.run_query(create_query, {"batch_size": batch_size}) created = result[0]['created'] if result else 0 if created == 0: break total_created += created tracker.update(total_created) logger.success(f"✅ Created {total_created:,} committee SPOKE_AT relationships") return total_created def verify_spoke_at(neo4j: Neo4jClient) -> None: """ Verify SPOKE_AT relationships were created correctly. Args: neo4j: Neo4j client instance """ logger.info("Verifying SPOKE_AT relationships...") # Count total SPOKE_AT relationships count_query = """ MATCH ()-[r:SPOKE_AT]->() RETURN count(r) as total """ result = neo4j.run_query(count_query) total = result[0]['total'] if result else 0 logger.info(f"Total SPOKE_AT relationships: {total:,}") # Count by target type by_type_query = """ MATCH (mp:MP)-[r:SPOKE_AT]->(target) WITH labels(target) AS target_labels, count(r) AS count RETURN target_labels[0] AS target_type, count ORDER BY count DESC """ result = neo4j.run_query(by_type_query) logger.info("SPOKE_AT relationships by target type:") for row in result: logger.info(f" - {row['target_type']}: {row['count']:,}") # Sample 5 MPs with most speeches top_speakers_query = """ MATCH (mp:MP)-[r:SPOKE_AT]->(d) WITH mp, count(r) AS speech_count ORDER BY speech_count DESC LIMIT 5 RETURN mp.name AS mp_name, speech_count """ result = neo4j.run_query(top_speakers_query) logger.info("Top 5 MPs by speaking activity:") for row in result: logger.info(f" - {row['mp_name']}: {row['speech_count']:,} speeches") logger.success("✅ Verification complete") def main(): """Main execution function.""" import os # Get Neo4j connection details from environment neo4j_uri = os.getenv('NEO4J_URI', 'bolt://localhost:7687') neo4j_user = os.getenv('NEO4J_USERNAME', 'neo4j') neo4j_password = os.getenv('NEO4J_PASSWORD') if not neo4j_password: logger.error("NEO4J_PASSWORD environment variable not set") sys.exit(1) logger.info(f"Connecting to Neo4j at {neo4j_uri}...") neo4j = Neo4jClient(uri=neo4j_uri, user=neo4j_user, password=neo4j_password) try: # Backfill Hansard SPOKE_AT relationships hansard_count = backfill_hansard_spoke_at(neo4j, batch_size=10000) # Backfill Committee SPOKE_AT relationships committee_count = backfill_committee_spoke_at(neo4j, batch_size=10000) # Verify results verify_spoke_at(neo4j) # Summary total = hansard_count + committee_count logger.success(f""" ╔═══════════════════════════════════════╗ ║ SPOKE_AT Backfill Complete ║ ╠═══════════════════════════════════════╣ ║ Hansard: {hansard_count:>10,} relationships ║ ║ Committee: {committee_count:>8,} relationships ║ ║ Total: {total:>12,} relationships ║ ╚═══════════════════════════════════════╝ """) except Exception as e: logger.error(f"Error during backfill: {e}") raise finally: neo4j.close() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/northernvariables/FedMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server