"""Government consultations ingestion into Neo4j."""
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, List
# Add fedmcp package to path
FEDMCP_PATH = Path(__file__).parent.parent.parent.parent / "fedmcp" / "src"
sys.path.insert(0, str(FEDMCP_PATH))
from fedmcp.clients.consultations import ConsultationsClient, Consultation
from ..utils.neo4j_client import Neo4jClient
from ..utils.progress import logger
def ingest_consultations_data(neo4j_client: Neo4jClient, batch_size: int = 1000) -> Dict[str, int]:
"""
Ingest government consultations from Open Canada portal.
Downloads the consultations dataset and loads into Neo4j,
creating Consultation nodes linked to Department nodes.
Args:
neo4j_client: Neo4j client
batch_size: Batch size for operations
Returns:
Dict with counts of created entities
"""
logger.info("=" * 60)
logger.info("GOVERNMENT CONSULTATIONS INGESTION")
logger.info("=" * 60)
client = ConsultationsClient()
stats = {}
# Clear existing Consultation data for full refresh
logger.info("Clearing existing Consultation data...")
while True:
result = neo4j_client.run_query(
"MATCH (n:Consultation) WITH n LIMIT 5000 DETACH DELETE n RETURN count(n) as deleted"
)
deleted = result[0]["deleted"] if result else 0
if deleted == 0:
break
logger.info(f" Deleted {deleted} Consultation nodes...")
logger.info("✓ Cleared existing Consultation data")
# Track unique organizations for Department nodes
unique_orgs: Dict[str, Dict[str, str]] = {}
# Load consultations data
logger.info("Fetching consultations data...")
try:
all_consultations = client._load_consultations_data()
logger.info(f"Found {len(all_consultations):,} consultations")
consultation_data = []
for i, cons in enumerate(all_consultations):
# Track organizations
if cons.owner_org_title and cons.owner_org:
if cons.owner_org not in unique_orgs:
unique_orgs[cons.owner_org] = {
"id": cons.owner_org,
"name": cons.owner_org_title,
}
# Create unique ID for the consultation
cons_id = f"consultation-{cons.registration_number}"
# Extract year from start_date if available
start_year = None
if cons.start_date:
try:
start_year = int(cons.start_date[:4])
except (ValueError, IndexError):
pass
cons_props = {
"id": cons_id,
"registration_number": cons.registration_number,
"title_en": cons.title_en[:2000] if cons.title_en else None,
"title_fr": cons.title_fr[:2000] if cons.title_fr else None,
"description_en": cons.description_en[:5000] if cons.description_en else None,
"description_fr": cons.description_fr[:5000] if cons.description_fr else None,
"start_date": cons.start_date,
"end_date": cons.end_date,
"start_year": start_year,
"status": cons.status,
"status_description": cons.status_description,
"subjects": cons.subjects,
"subject_descriptions": cons.subject_descriptions,
"organization": cons.owner_org_title,
"organization_code": cons.owner_org,
"partner_departments": cons.partner_departments,
"profile_page_en": cons.profile_page_en,
"profile_page_fr": cons.profile_page_fr,
"report_available_online": cons.report_available_online,
"report_link_en": cons.report_link_en,
"report_link_fr": cons.report_link_fr,
"updated_at": datetime.utcnow().isoformat(),
}
# Remove None values
cons_props = {k: v for k, v in cons_props.items() if v is not None}
consultation_data.append(cons_props)
if (i + 1) % 1000 == 0:
logger.info(f"Processed {i + 1:,} consultations...")
stats["consultations"] = neo4j_client.batch_create_nodes(
"Consultation", consultation_data, batch_size
)
logger.info(f"Created {stats['consultations']:,} Consultation nodes total")
except Exception as e:
logger.error(f"Could not fetch consultations data: {e}")
stats["consultations"] = 0
return stats
# Create/update Department nodes (merge with existing)
logger.info(f"Merging {len(unique_orgs):,} organization nodes...")
for org_data in unique_orgs.values():
try:
neo4j_client.run_query(
"""
MERGE (d:Department {id: $id})
ON CREATE SET d.name = $name
ON MATCH SET d.name = COALESCE(d.name, $name)
""",
org_data
)
except Exception as e:
logger.debug(f"Error creating department {org_data['id']}: {e}")
stats["departments_merged"] = len(unique_orgs)
# Create relationships to Department nodes
logger.info("Creating Consultation → Department relationships...")
rel_query = """
MATCH (c:Consultation)
WHERE c.organization_code IS NOT NULL
MATCH (d:Department {id: c.organization_code})
MERGE (c)-[:FROM_DEPARTMENT]->(d)
"""
neo4j_client.run_query(rel_query)
# Count relationships
rel_count_query = """
MATCH (c:Consultation)-[r:FROM_DEPARTMENT]->()
RETURN count(r) as count
"""
count_result = neo4j_client.run_query(rel_count_query)
rel_count = count_result[0]["count"] if count_result else 0
stats["from_department_relationships"] = rel_count
logger.info(f" ✓ Created {rel_count:,} FROM_DEPARTMENT relationships")
# Create indexes
logger.info("Creating indexes...")
indexes = [
"CREATE INDEX IF NOT EXISTS FOR (n:Consultation) ON (n.id)",
"CREATE INDEX IF NOT EXISTS FOR (n:Consultation) ON (n.registration_number)",
"CREATE INDEX IF NOT EXISTS FOR (n:Consultation) ON (n.organization_code)",
"CREATE INDEX IF NOT EXISTS FOR (n:Consultation) ON (n.status)",
"CREATE INDEX IF NOT EXISTS FOR (n:Consultation) ON (n.start_year)",
"CREATE FULLTEXT INDEX consultation_title_fulltext IF NOT EXISTS FOR (n:Consultation) ON EACH [n.title_en, n.description_en]",
]
for idx in indexes:
try:
neo4j_client.run_query(idx)
except Exception as e:
logger.debug(f"Index creation note: {e}")
logger.info("✓ Indexes created")
logger.info("=" * 60)
logger.success("✅ CONSULTATIONS DATA INGESTION COMPLETE")
logger.info(f"Consultations: {stats['consultations']:,}")
logger.info(f"Departments Merged: {stats['departments_merged']:,}")
logger.info(f"FROM_DEPARTMENT Relationships: {stats.get('from_department_relationships', 0):,}")
logger.info("=" * 60)
return stats