Skip to main content
Glama
northernvariables

FedMCP - Federal Parliamentary Information

backfill_committee_evidence.py8.46 kB
#!/usr/bin/env python3 """ Backfill Committee Evidence from Historical Evidence IDs This script uses the backed-up evidence_id mappings to import historical committee testimony that was recorded before the schema migration. Usage: python scripts/backfill_committee_evidence.py [--limit N] [--session SESSION] Examples: # Backfill all historical evidence python scripts/backfill_committee_evidence.py # Backfill only 45-1 session python scripts/backfill_committee_evidence.py --session 45-1 # Test with 10 meetings python scripts/backfill_committee_evidence.py --limit 10 """ import sys import os import json import argparse from pathlib import Path from typing import List, Dict, Any # Add packages to path PIPELINE_PATH = Path(__file__).parent.parent sys.path.insert(0, str(PIPELINE_PATH)) from fedmcp_pipeline.utils.neo4j_client import Neo4jClient from fedmcp_pipeline.utils.progress import logger, ProgressTracker from fedmcp_pipeline.ingest.committee_evidence_xml_import import CommitteeEvidenceXMLImporter def load_evidence_mappings(backup_file: str) -> List[Dict[str, Any]]: """Load evidence ID mappings from backup file.""" logger.info(f"Loading evidence mappings from: {backup_file}") with open(backup_file, 'r') as f: mappings = json.load(f) logger.info(f"Loaded {len(mappings):,} meeting mappings") return mappings def find_committee_for_evidence( neo4j: Neo4jClient, evidence_id: int, meeting_number: int, session_id: str ) -> str: """ Try to find the committee code for a meeting. Strategy: 1. Look for existing Meeting with same number/session (from daily-import) 2. Query OpenParliament API as fallback """ # Try to find matching meeting in Neo4j (from daily-import) query = """ MATCH (c:Committee)-[:HELD_MEETING]->(m:Meeting) WHERE m.number = $meeting_number AND m.session_id = $session_id RETURN c.code as committee_code LIMIT 1 """ result = neo4j.run_query(query, { 'meeting_number': meeting_number, 'session_id': session_id }) if result and len(result) > 0: return result[0]['committee_code'] # Could add OpenParliament API lookup here as fallback return None def backfill_evidence( neo4j: Neo4jClient, mappings: List[Dict[str, Any]], limit: int = None, session_filter: str = None ) -> Dict[str, int]: """ Backfill historical committee evidence. Args: neo4j: Neo4j client mappings: List of evidence ID mappings limit: Optional limit on number to process session_filter: Optional session ID to filter (e.g., '45-1') Returns: Dict with import statistics """ stats = { 'total': len(mappings), 'processed': 0, 'imported': 0, 'skipped_no_committee': 0, 'skipped_exists': 0, 'errors': 0 } # Filter by session if specified if session_filter: mappings = [m for m in mappings if m.get('session_id') == session_filter] logger.info(f"Filtered to {len(mappings)} meetings in session {session_filter}") stats['total'] = len(mappings) # Apply limit if limit: mappings = mappings[:limit] logger.info(f"Limited to {limit} meetings") stats['total'] = len(mappings) # Sort by date descending (newest first) mappings = sorted(mappings, key=lambda m: m.get('date', ''), reverse=True) logger.info(f"Starting backfill of {len(mappings):,} meetings...") print() importer = CommitteeEvidenceXMLImporter(neo4j) tracker = ProgressTracker(total=len(mappings), desc="Backfilling evidence") for mapping in mappings: stats['processed'] += 1 tracker.update(1) evidence_id = mapping.get('evidence_id') meeting_number = mapping.get('meeting_number') session_id = mapping.get('session_id') date = mapping.get('date') if not evidence_id or not meeting_number or not session_id: logger.warning(f"Skipping incomplete mapping: {mapping}") stats['errors'] += 1 continue # Try to find committee code committee_code = find_committee_for_evidence( neo4j, evidence_id, meeting_number, session_id ) if not committee_code: logger.debug(f"No committee found for meeting {meeting_number} ({session_id})") stats['skipped_no_committee'] += 1 continue try: # Check if evidence already exists check_query = """ MATCH (e:CommitteeEvidence) WHERE e.evidence_id = $evidence_id RETURN e.id as id """ existing = neo4j.run_query(check_query, {'evidence_id': evidence_id}) if existing and len(existing) > 0: logger.debug(f"Evidence {evidence_id} already exists, skipping") stats['skipped_exists'] += 1 continue # Import evidence for this meeting # The importer will fetch from DocumentViewer XML using committee_code + meeting_number result = importer.import_evidence_for_meetings( committee_code=committee_code, meeting_numbers=[meeting_number], skip_existing=True ) if result['meetings'] > 0: stats['imported'] += 1 logger.success(f"✓ Imported {committee_code} meeting {meeting_number} ({date})") except Exception as e: logger.error(f"Error importing evidence {evidence_id}: {e}") stats['errors'] += 1 tracker.close() return stats def main(): """Main entry point.""" parser = argparse.ArgumentParser(description='Backfill historical committee evidence') parser.add_argument('--limit', type=int, help='Limit number of meetings to process') parser.add_argument('--session', type=str, help='Filter to specific session (e.g., 45-1)') parser.add_argument( '--backup-file', type=str, default='packages/data-pipeline/backups/committee_evidence_backup_*.json', help='Path to backup JSON file' ) args = parser.parse_args() logger.info("=" * 80) logger.info("COMMITTEE EVIDENCE BACKFILL SCRIPT") logger.info("=" * 80) print() # Find backup file backup_file = None if '*' in args.backup_file: # Find most recent backup import glob backups = glob.glob(args.backup_file) if backups: backup_file = sorted(backups)[-1] # Most recent else: backup_file = args.backup_file if not backup_file or not Path(backup_file).exists(): logger.error(f"Backup file not found: {args.backup_file}") logger.info("Create backup with: packages/data-pipeline/scripts/extract_evidence_mappings.py") sys.exit(1) # Load mappings mappings = load_evidence_mappings(backup_file) # Connect to Neo4j neo4j_uri = os.getenv('NEO4J_URI', 'bolt://10.128.0.3:7687') neo4j_user = os.getenv('NEO4J_USERNAME', 'neo4j') neo4j_password = os.getenv('NEO4J_PASSWORD') if not neo4j_password: logger.error("NEO4J_PASSWORD environment variable not set") sys.exit(1) neo4j = Neo4jClient(uri=neo4j_uri, user=neo4j_user, password=neo4j_password) try: # Run backfill stats = backfill_evidence( neo4j=neo4j, mappings=mappings, limit=args.limit, session_filter=args.session ) # Print summary print() logger.info("=" * 80) logger.info("BACKFILL COMPLETE") logger.info("=" * 80) logger.info(f"Total mappings: {stats['total']:,}") logger.info(f"Processed: {stats['processed']:,}") logger.success(f"✅ Imported: {stats['imported']:,}") logger.info(f"Skipped (no committee): {stats['skipped_no_committee']:,}") logger.info(f"Skipped (already exists): {stats['skipped_exists']:,}") if stats['errors'] > 0: logger.warning(f"⚠️ Errors: {stats['errors']:,}") except Exception as e: logger.error(f"Backfill failed: {e}") import traceback traceback.print_exc() sys.exit(1) finally: neo4j.close() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/northernvariables/FedMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server