"""GC InfoBase data ingestion: departmental results, spending, and FTE data."""
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, List, Optional
# Add fedmcp package to path
FEDMCP_PATH = Path(__file__).parent.parent.parent.parent / "fedmcp" / "src"
sys.path.insert(0, str(FEDMCP_PATH))
from fedmcp.clients.gc_infobase import GCInfoBaseClient, DepartmentalResult, ProgramExpenditure
from ..utils.neo4j_client import Neo4jClient
from ..utils.progress import logger
def ingest_gc_infobase_data(neo4j_client: Neo4jClient, batch_size: int = 5000) -> Dict[str, int]:
"""
Ingest GC InfoBase data from Open Canada portal.
Downloads performance indicators (27MB) and expenditure/FTE data (3MB),
caches locally, and loads into Neo4j.
Args:
neo4j_client: Neo4j client
batch_size: Batch size for operations
Returns:
Dict with counts of created entities
"""
logger.info("=" * 60)
logger.info("GC INFOBASE DATA INGESTION")
logger.info("=" * 60)
client = GCInfoBaseClient()
stats = {}
# Clear existing GC InfoBase data for full refresh
logger.info("Clearing existing GC InfoBase data...")
for label in ["DepartmentalResult", "ProgramExpenditure", "Department"]:
while True:
result = neo4j_client.run_query(
f"MATCH (n:{label}) WITH n LIMIT 10000 DETACH DELETE n RETURN count(n) as deleted"
)
deleted = result[0]["deleted"] if result else 0
if deleted == 0:
break
logger.info(f" Deleted {deleted} {label} nodes...")
logger.info("✓ Cleared existing GC InfoBase data")
# Track unique departments
unique_departments: Dict[str, Dict[str, str]] = {}
# 1. Departmental Results (Performance Data)
logger.info("Fetching departmental results (may download 28MB on first run)...")
try:
all_results = client.get_program_results(department="", fiscal_year=None)
logger.info(f"Found {len(all_results):,} performance results")
result_data = []
for i, result in enumerate(all_results):
# Track departments
if result.organization and result.organization_id:
if result.organization_id not in unique_departments:
unique_departments[result.organization_id] = {
"id": result.organization_id,
"name": result.organization,
}
result_props = {
"id": f"result-{result.organization_id}-{result.fiscal_year}-{result.indicator_id}",
"organization": result.organization,
"organization_id": result.organization_id,
"fiscal_year": result.fiscal_year,
"program": result.program,
"program_id": result.program_id,
"indicator_name": result.indicator_name,
"indicator_id": result.indicator_id,
"target": result.target,
"actual_result": result.actual_result,
"met_target": result.met_target,
"updated_at": datetime.utcnow().isoformat(),
}
result_props = {k: v for k, v in result_props.items() if v is not None}
result_data.append(result_props)
if (i + 1) % 50000 == 0:
logger.info(f"Processed {i + 1:,} results...")
stats["departmental_results"] = neo4j_client.batch_create_nodes(
"DepartmentalResult", result_data, batch_size
)
except Exception as e:
logger.warning(f"Could not fetch departmental results: {e}")
stats["departmental_results"] = 0
# 2. Program Expenditures and FTEs
logger.info("Fetching program expenditures and FTE data...")
try:
all_expenditures = client.get_department_spending(department="", fiscal_year=None)
logger.info(f"Found {len(all_expenditures):,} expenditure records")
expenditure_data = []
for i, exp in enumerate(all_expenditures):
# Track departments
if exp.organization and exp.organization_id:
if exp.organization_id not in unique_departments:
unique_departments[exp.organization_id] = {
"id": exp.organization_id,
"name": exp.organization,
}
exp_props = {
"id": f"exp-{exp.organization_id}-{exp.fiscal_year}-{exp.program_id}",
"organization": exp.organization,
"organization_id": exp.organization_id,
"fiscal_year": exp.fiscal_year,
"program": exp.program,
"program_id": exp.program_id,
"planned_spending": exp.planned_spending,
"actual_spending": exp.actual_spending,
"variance": exp.variance,
"fte_planned": exp.fte_planned,
"fte_actual": exp.fte_actual,
"updated_at": datetime.utcnow().isoformat(),
}
exp_props = {k: v for k, v in exp_props.items() if v is not None}
expenditure_data.append(exp_props)
if (i + 1) % 10000 == 0:
logger.info(f"Processed {i + 1:,} expenditure records...")
stats["program_expenditures"] = neo4j_client.batch_create_nodes(
"ProgramExpenditure", expenditure_data, batch_size
)
except Exception as e:
logger.warning(f"Could not fetch expenditure data: {e}")
stats["program_expenditures"] = 0
# 3. Create Department Nodes
logger.info(f"Creating {len(unique_departments):,} unique department nodes...")
dept_data = list(unique_departments.values())
stats["departments"] = neo4j_client.batch_create_nodes("Department", dept_data, batch_size)
# 4. Create Relationships
logger.info("Creating relationships between entities...")
# DepartmentalResult → Department
logger.info(" Creating DepartmentalResult → Department (FROM_DEPARTMENT)...")
result_dept_query = """
MATCH (r:DepartmentalResult)
WHERE r.organization_id IS NOT NULL
MATCH (d:Department {id: r.organization_id})
MERGE (r)-[:FROM_DEPARTMENT]->(d)
"""
neo4j_client.run_query(result_dept_query)
# ProgramExpenditure → Department
logger.info(" Creating ProgramExpenditure → Department (FROM_DEPARTMENT)...")
exp_dept_query = """
MATCH (e:ProgramExpenditure)
WHERE e.organization_id IS NOT NULL
MATCH (d:Department {id: e.organization_id})
MERGE (e)-[:FROM_DEPARTMENT]->(d)
"""
neo4j_client.run_query(exp_dept_query)
# Count relationships
rel_count_query = """
MATCH ()-[r:FROM_DEPARTMENT]->()
RETURN count(r) as count
"""
count_result = neo4j_client.run_query(rel_count_query)
rel_count = count_result[0]["count"] if count_result else 0
stats["from_department_relationships"] = rel_count
logger.info(f" ✓ Created {rel_count:,} FROM_DEPARTMENT relationships")
# 5. Create indexes
logger.info("Creating indexes...")
indexes = [
"CREATE INDEX IF NOT EXISTS FOR (n:DepartmentalResult) ON (n.id)",
"CREATE INDEX IF NOT EXISTS FOR (n:DepartmentalResult) ON (n.organization_id)",
"CREATE INDEX IF NOT EXISTS FOR (n:DepartmentalResult) ON (n.fiscal_year)",
"CREATE INDEX IF NOT EXISTS FOR (n:ProgramExpenditure) ON (n.id)",
"CREATE INDEX IF NOT EXISTS FOR (n:ProgramExpenditure) ON (n.organization_id)",
"CREATE INDEX IF NOT EXISTS FOR (n:ProgramExpenditure) ON (n.fiscal_year)",
"CREATE INDEX IF NOT EXISTS FOR (n:Department) ON (n.id)",
"CREATE INDEX IF NOT EXISTS FOR (n:Department) ON (n.name)",
]
for idx in indexes:
try:
neo4j_client.run_query(idx)
except Exception as e:
logger.debug(f"Index creation note: {e}")
logger.info("✓ Indexes created")
logger.info("=" * 60)
logger.success("✅ GC INFOBASE DATA INGESTION COMPLETE")
logger.info(f"Departmental Results: {stats['departmental_results']:,}")
logger.info(f"Program Expenditures: {stats['program_expenditures']:,}")
logger.info(f"Departments: {stats['departments']:,}")
logger.info(f"FROM_DEPARTMENT Relationships: {stats.get('from_department_relationships', 0):,}")
logger.info("=" * 60)
return stats