Skip to main content
Glama
northernvariables

FedMCP - Federal Parliamentary Information

lobbying.py9.9 kB
"""Lobbying data ingestion: registrations, communications, lobbyists, organizations.""" import sys from datetime import datetime from pathlib import Path from typing import Dict, Any # Add fedmcp package to path FEDMCP_PATH = Path(__file__).parent.parent.parent.parent / "fedmcp" / "src" sys.path.insert(0, str(FEDMCP_PATH)) from fedmcp.clients.lobbying import LobbyingRegistryClient from ..utils.neo4j_client import Neo4jClient from ..utils.progress import logger def ingest_lobbying_data(neo4j_client: Neo4jClient, batch_size: int = 10000) -> Dict[str, int]: """ Ingest lobbying registry data. Downloads ~90MB CSV data from Open Canada portal, caches locally, and loads into Neo4j. Args: neo4j_client: Neo4j client batch_size: Batch size for operations Returns: Dict with counts of created entities """ logger.info("=" * 60) logger.info("LOBBYING DATA INGESTION") logger.info("=" * 60) # Use official source (lobbycanada.gc.ca) as the opendata URLs are outdated/broken # The official site is accessible from Cloud Run without any networking issues lobby_client = LobbyingRegistryClient(source="official") stats = {} # Clear existing lobbying data for full refresh (in batches to avoid Neo4j memory limits) logger.info("Clearing existing lobbying data...") for label in ["LobbyRegistration", "LobbyCommunication", "Organization", "Lobbyist"]: while True: result = neo4j_client.run_query(f"MATCH (n:{label}) WITH n LIMIT 10000 DETACH DELETE n RETURN count(n) as deleted") deleted = result[0]["deleted"] if result else 0 if deleted == 0: break logger.info(f" Deleted {deleted} {label} nodes...") logger.info("✓ Cleared existing lobbying data") # Track unique entities for later creation unique_organizations = {} unique_lobbyists = {} # 1. Lobby Registrations logger.info("Fetching lobby registrations (may download 90MB on first run)...") registrations = lobby_client.search_registrations(active_only=False, limit=None) logger.info(f"Found {len(registrations):,} registrations") # Transform to Neo4j format with ALL fields reg_data = [] for i, reg in enumerate(registrations): # Extract organization org_name = reg.client_org_name if org_name and org_name not in unique_organizations: unique_organizations[org_name] = { "id": f"org-{len(unique_organizations)}", "name": org_name, } # Extract lobbyist lobbyist_name = reg.registrant_name if lobbyist_name and lobbyist_name not in unique_lobbyists: unique_lobbyists[lobbyist_name] = { "id": f"lobbyist-{len(unique_lobbyists)}", "name": lobbyist_name, } reg_props = { "id": reg.reg_id or f"reg-{i}", "reg_number": reg.reg_number, "client_org_name": reg.client_org_name, "registrant_name": reg.registrant_name, "effective_date": reg.effective_date, "end_date": reg.end_date if reg.end_date and reg.end_date != "null" else None, "active": reg.is_active, "subject_matters": reg.subject_matters if reg.subject_matters else [], "government_institutions": reg.government_institutions if reg.government_institutions else [], "updated_at": datetime.utcnow().isoformat(), } reg_props = {k: v for k, v in reg_props.items() if v is not None} reg_data.append(reg_props) # Log progress every 10,000 registrations if (i + 1) % 10000 == 0: logger.info(f"Processed {i + 1:,} registrations...") stats["lobby_registrations"] = neo4j_client.batch_create_nodes("LobbyRegistration", reg_data, batch_size) # 2. Lobby Communications logger.info("Fetching lobby communications...") communications = lobby_client.search_communications(limit=None) # Process ALL logger.info(f"Found {len(communications):,} communications") comm_data = [] for i, comm in enumerate(communications): # Extract organization org_name = comm.client_org_name if org_name and org_name not in unique_organizations: unique_organizations[org_name] = { "id": f"org-{len(unique_organizations)}", "name": org_name, } # Extract lobbyist lobbyist_name = comm.registrant_name if lobbyist_name and lobbyist_name not in unique_lobbyists: unique_lobbyists[lobbyist_name] = { "id": f"lobbyist-{len(unique_lobbyists)}", "name": lobbyist_name, } comm_props = { "id": comm.comlog_id or f"comm-{i}", "client_org_name": comm.client_org_name, "registrant_name": comm.registrant_name, "date": comm.comm_date, "dpoh_names": comm.dpoh_names if comm.dpoh_names else [], "dpoh_titles": comm.dpoh_titles if comm.dpoh_titles else [], "institutions": comm.institutions if comm.institutions else [], "subject_matters": comm.subject_matters if comm.subject_matters else [], "updated_at": datetime.utcnow().isoformat(), } comm_props = {k: v for k, v in comm_props.items() if v is not None} comm_data.append(comm_props) # Log progress every 10,000 communications if (i + 1) % 10000 == 0: logger.info(f"Processed {i + 1:,} communications...") stats["lobby_communications"] = neo4j_client.batch_create_nodes("LobbyCommunication", comm_data, batch_size) # 3. Create Organizations logger.info(f"Creating {len(unique_organizations):,} unique organizations...") org_data = list(unique_organizations.values()) stats["organizations"] = neo4j_client.batch_create_nodes("Organization", org_data, batch_size) # 4. Create Lobbyists logger.info(f"Creating {len(unique_lobbyists):,} unique lobbyists...") lobbyist_data = list(unique_lobbyists.values()) stats["lobbyists"] = neo4j_client.batch_create_nodes("Lobbyist", lobbyist_data, batch_size) # 5. Create Relationships logger.info("Creating relationships between entities...") # 5a. LobbyRegistration relationships logger.info(" Creating LobbyRegistration → Organization (ON_BEHALF_OF)...") reg_org_query = """ MATCH (lr:LobbyRegistration) MATCH (o:Organization {name: lr.client_org_name}) MERGE (lr)-[:ON_BEHALF_OF]->(o) """ neo4j_client.run_query(reg_org_query) logger.info(" Creating Lobbyist → LobbyRegistration (REGISTERED_FOR)...") reg_lobbyist_query = """ MATCH (lr:LobbyRegistration) MATCH (l:Lobbyist {name: lr.registrant_name}) MERGE (l)-[:REGISTERED_FOR]->(lr) """ neo4j_client.run_query(reg_lobbyist_query) # 5b. LobbyCommunication relationships logger.info(" Creating LobbyCommunication → Organization (COMMUNICATION_BY)...") comm_org_query = """ MATCH (lc:LobbyCommunication) MATCH (o:Organization {name: lc.client_org_name}) MERGE (lc)-[:COMMUNICATION_BY]->(o) """ neo4j_client.run_query(comm_org_query) logger.info(" Creating LobbyCommunication → Lobbyist (CONDUCTED_BY)...") comm_lobbyist_query = """ MATCH (lc:LobbyCommunication) WHERE lc.registrant_name IS NOT NULL MATCH (l:Lobbyist {name: lc.registrant_name}) MERGE (lc)-[:CONDUCTED_BY]->(l) """ neo4j_client.run_query(comm_lobbyist_query) # 5c. LobbyCommunication → MP (CONTACTED) relationships # This is the critical relationship for MP lobbying pages # Use Cypher's built-in matching capabilities for better performance logger.info(" Creating LobbyCommunication → MP (CONTACTED) relationships...") # Strategy: Use UNWIND to process dpoh_names and match to MPs in Cypher # This is much faster than Python loops with individual queries contacted_query = """ MATCH (lc:LobbyCommunication) WHERE lc.dpoh_names IS NOT NULL AND size(lc.dpoh_names) > 0 UNWIND lc.dpoh_names as dpoh_name MATCH (m:MP) WHERE // Exact match on name toLower(m.name) = toLower(dpoh_name) // Or match Given + Family OR toLower(m.given_name + ' ' + m.family_name) = toLower(dpoh_name) // Or match Family, Given (common format) OR toLower(m.family_name + ', ' + m.given_name) = toLower(dpoh_name) // Or match first name + family (e.g., "John Smith" for "John Robert Smith") OR (m.given_name IS NOT NULL AND m.family_name IS NOT NULL AND toLower(split(m.given_name, ' ')[0] + ' ' + m.family_name) = toLower(dpoh_name)) MERGE (lc)-[:CONTACTED]->(m) """ neo4j_client.run_query(contacted_query) # Count how many CONTACTED relationships were created count_query = """ MATCH ()-[r:CONTACTED]->() RETURN count(r) as count """ count_result = neo4j_client.run_query(count_query) contacted_count = count_result[0]['count'] if count_result else 0 stats["contacted_relationships"] = contacted_count logger.info(f" ✓ Created {contacted_count:,} CONTACTED relationships") logger.info("=" * 60) logger.success("✅ LOBBYING DATA INGESTION COMPLETE") logger.info(f"Registrations: {stats['lobby_registrations']:,}") logger.info(f"Communications: {stats['lobby_communications']:,}") logger.info(f"Organizations: {stats['organizations']:,}") logger.info(f"Lobbyists: {stats['lobbyists']:,}") logger.info(f"CONTACTED Relationships: {stats.get('contacted_relationships', 0):,}") if stats.get('unmatched_dpoh_names'): logger.info(f"Unmatched DPOH Names: {stats['unmatched_dpoh_names']:,}") logger.info("=" * 60) return stats

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/northernvariables/FedMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server