Skip to main content
Glama
northernvariables

FedMCP - Federal Parliamentary Information

cli.py8.76 kB
"""Command-line interface for CanadaGPT data pipeline.""" import argparse import sys from pathlib import Path from .utils.config import Config from .utils.neo4j_client import Neo4jClient from .utils.progress import logger from .ingest.parliament import ingest_parliament_data from .ingest.lobbying import ingest_lobbying_data from .ingest.finances import ingest_financial_data from .relationships.political import build_political_structure from .relationships.legislative import build_legislative_relationships from .relationships.lobbying import build_lobbying_network from .relationships.financial import build_financial_flows def run_full_pipeline(config: Config) -> None: """Run complete data ingestion pipeline.""" logger.info("🚀 Starting FULL PIPELINE") logger.info(f"Neo4j URI: {config.neo4j_uri}") logger.info(f"Batch size: {config.batch_size:,}") logger.info("") with Neo4jClient(config.neo4j_uri, config.neo4j_user, config.neo4j_password) as client: # Test connection client.test_connection() # 1. Parliament data ingest_parliament_data(client, batch_size=config.batch_size) # 2. Lobbying data ingest_lobbying_data(client, batch_size=config.batch_size) # 3. Financial data ingest_financial_data(client, batch_size=config.batch_size) # 4. Build relationships build_political_structure(client, batch_size=config.batch_size) build_legislative_relationships(client, batch_size=config.batch_size) build_lobbying_network(client, batch_size=config.batch_size) build_financial_flows(client, batch_size=config.batch_size) # 5. Show final stats stats = client.get_stats() logger.info("=" * 60) logger.success("✅ FULL PIPELINE COMPLETE") logger.info(f"Total nodes: {stats['total_nodes']:,}") logger.info(f"Total relationships: {stats['total_relationships']:,}") logger.info("") logger.info("Top node types:") for label, count in sorted(stats['node_counts'].items(), key=lambda x: x[1], reverse=True)[:10]: logger.info(f" {label}: {count:,}") logger.info("") logger.info("Top relationship types:") for rel_type, count in sorted(stats['relationship_counts'].items(), key=lambda x: x[1], reverse=True)[:10]: logger.info(f" {rel_type}: {count:,}") logger.info("=" * 60) def run_parliament_only(config: Config) -> None: """Run only parliamentary data ingestion.""" logger.info("🏛️ Starting PARLIAMENT INGESTION") with Neo4jClient(config.neo4j_uri, config.neo4j_user, config.neo4j_password) as client: client.test_connection() ingest_parliament_data(client, batch_size=config.batch_size) build_political_structure(client, batch_size=config.batch_size) def run_lobbying_only(config: Config) -> None: """Run only lobbying data ingestion.""" logger.info("🤝 Starting LOBBYING INGESTION") with Neo4jClient(config.neo4j_uri, config.neo4j_user, config.neo4j_password) as client: client.test_connection() ingest_lobbying_data(client, batch_size=config.batch_size) def run_finances_only(config: Config) -> None: """Run only financial data ingestion.""" logger.info("💰 Starting FINANCIAL INGESTION") with Neo4jClient(config.neo4j_uri, config.neo4j_user, config.neo4j_password) as client: client.test_connection() ingest_financial_data(client, batch_size=config.batch_size) def run_relationships_only(config: Config) -> None: """Build relationships only (assumes data already loaded).""" logger.info("🔗 Starting RELATIONSHIP BUILDING") with Neo4jClient(config.neo4j_uri, config.neo4j_user, config.neo4j_password) as client: client.test_connection() build_political_structure(client, batch_size=config.batch_size) build_legislative_relationships(client, batch_size=config.batch_size) build_lobbying_network(client, batch_size=config.batch_size) build_financial_flows(client, batch_size=config.batch_size) def test_connection(config: Config) -> None: """Test Neo4j connection and show database stats.""" logger.info("🔍 Testing Neo4j connection...") with Neo4jClient(config.neo4j_uri, config.neo4j_user, config.neo4j_password) as client: info = client.test_connection() stats = client.get_stats() logger.info("") logger.success("✅ Connection successful!") logger.info(f"Server: {info['name']} {info['version']} ({info['edition']})") logger.info(f"Total nodes: {stats['total_nodes']:,}") logger.info(f"Total relationships: {stats['total_relationships']:,}") if stats['total_nodes'] > 0: logger.info("") logger.info("Node counts:") for label, count in sorted(stats['node_counts'].items(), key=lambda x: x[1], reverse=True): logger.info(f" {label}: {count:,}") if stats['total_relationships'] > 0: logger.info("") logger.info("Relationship counts:") for rel_type, count in sorted(stats['relationship_counts'].items(), key=lambda x: x[1], reverse=True): logger.info(f" {rel_type}: {count:,}") def main(): """Main CLI entry point.""" parser = argparse.ArgumentParser( description="CanadaGPT Data Pipeline - Ingest Canadian government data into Neo4j", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Run full pipeline (initial load, ~4-6 hours) canadagpt-ingest --full # Test connection and show stats canadagpt-ingest --test # Ingest only parliament data canadagpt-ingest --parliament # Build only relationships (assumes data loaded) canadagpt-ingest --relationships # Validate configuration canadagpt-ingest --validate """, ) # Pipeline modes (mutually exclusive) mode_group = parser.add_mutually_exclusive_group(required=True) mode_group.add_argument("--full", action="store_true", help="Run full pipeline (all data + relationships)") mode_group.add_argument("--parliament", action="store_true", help="Ingest only parliamentary data") mode_group.add_argument("--lobbying", action="store_true", help="Ingest only lobbying data") mode_group.add_argument("--finances", action="store_true", help="Ingest only financial data") mode_group.add_argument("--relationships", action="store_true", help="Build relationships only") mode_group.add_argument("--test", action="store_true", help="Test connection and show stats") mode_group.add_argument("--validate", action="store_true", help="Validate configuration") mode_group.add_argument("--incremental", action="store_true", help="Incremental update (TODO)") # Configuration options parser.add_argument("--env-file", type=Path, help="Path to .env file (default: auto-detect)") parser.add_argument("--batch-size", type=int, help="Batch size for Neo4j operations (default: 10000)") parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging") args = parser.parse_args() # Load configuration try: config = Config(env_file=args.env_file) # Override batch size if specified if args.batch_size: config.batch_size = args.batch_size # Set log level if args.verbose: config.log_level = "DEBUG" except ValueError as e: logger.error(f"Configuration error: {e}") logger.info("Create a .env file with NEO4J_URI and NEO4J_PASSWORD") logger.info("See .env.example for template") sys.exit(1) # Execute requested mode try: if args.validate: logger.info("Validating configuration...") config.validate() logger.success("✅ Configuration is valid") elif args.test: test_connection(config) elif args.full: run_full_pipeline(config) elif args.parliament: run_parliament_only(config) elif args.lobbying: run_lobbying_only(config) elif args.finances: run_finances_only(config) elif args.relationships: run_relationships_only(config) elif args.incremental: logger.error("Incremental mode not yet implemented") sys.exit(1) except KeyboardInterrupt: logger.warning("\n⚠️ Pipeline interrupted by user") sys.exit(130) except Exception as e: logger.error(f"Pipeline failed: {e}") logger.exception(e) sys.exit(1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/northernvariables/FedMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server