"""Departmental travel and hospitality expenses ingestion into Neo4j."""
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, List
# Add fedmcp package to path
FEDMCP_PATH = Path(__file__).parent.parent.parent.parent / "fedmcp" / "src"
sys.path.insert(0, str(FEDMCP_PATH))
from fedmcp.clients.departmental_expenses import DepartmentalExpensesClient
from ..utils.neo4j_client import Neo4jClient
from ..utils.progress import logger
def ingest_departmental_expenses(neo4j_client: Neo4jClient, batch_size: int = 2000) -> Dict[str, int]:
"""
Ingest departmental travel and hospitality expenses from Open Canada portal.
Downloads the travel and hospitality CSV datasets and loads into Neo4j,
creating DepartmentalTravel and DepartmentalHospitality nodes linked to Department nodes.
Args:
neo4j_client: Neo4j client
batch_size: Batch size for operations
Returns:
Dict with counts of created entities
"""
logger.info("=" * 60)
logger.info("DEPARTMENTAL EXPENSES INGESTION")
logger.info("=" * 60)
client = DepartmentalExpensesClient(auto_update=True)
stats = {}
# Clear existing data for full refresh
logger.info("Clearing existing DepartmentalTravel data...")
while True:
result = neo4j_client.run_query(
"MATCH (n:DepartmentalTravel) WITH n LIMIT 10000 DETACH DELETE n RETURN count(n) as deleted"
)
deleted = result[0]["deleted"] if result else 0
if deleted == 0:
break
logger.info(f" Deleted {deleted} DepartmentalTravel nodes...")
logger.info("✓ Cleared existing DepartmentalTravel data")
logger.info("Clearing existing DepartmentalHospitality data...")
while True:
result = neo4j_client.run_query(
"MATCH (n:DepartmentalHospitality) WITH n LIMIT 10000 DETACH DELETE n RETURN count(n) as deleted"
)
deleted = result[0]["deleted"] if result else 0
if deleted == 0:
break
logger.info(f" Deleted {deleted} DepartmentalHospitality nodes...")
logger.info("✓ Cleared existing DepartmentalHospitality data")
# Track unique organizations
unique_orgs: Dict[str, Dict[str, str]] = {}
# =====================
# TRAVEL EXPENSES
# =====================
logger.info("Loading travel expenses data...")
try:
travel_records = client._load_travel()
logger.info(f"Found {len(travel_records):,} travel records")
travel_data = []
for i, record in enumerate(travel_records):
# Track organizations
if record.owner_org_title and record.owner_org:
if record.owner_org not in unique_orgs:
unique_orgs[record.owner_org] = {
"id": record.owner_org,
"name": record.owner_org_title,
}
# Create unique ID
travel_id = f"travel-{record.ref_number}"
props = {
"id": travel_id,
"ref_number": record.ref_number,
"organization": record.owner_org_title,
"organization_code": record.owner_org,
"disclosure_group": record.disclosure_group,
"title_en": record.title_en[:500] if record.title_en else None,
"title_fr": record.title_fr[:500] if record.title_fr else None,
"name": record.name[:200] if record.name else None,
"purpose_en": record.purpose_en[:1000] if record.purpose_en else None,
"purpose_fr": record.purpose_fr[:1000] if record.purpose_fr else None,
"start_date": record.start_date,
"end_date": record.end_date,
"travel_year": record.travel_year,
"destination_en": record.destination_en[:500] if record.destination_en else None,
"destination_fr": record.destination_fr[:500] if record.destination_fr else None,
"airfare": record.airfare,
"other_transport": record.other_transport,
"lodging": record.lodging,
"meals": record.meals,
"other_expenses": record.other_expenses,
"total": record.total,
"updated_at": datetime.utcnow().isoformat(),
}
# Remove None values
props = {k: v for k, v in props.items() if v is not None}
travel_data.append(props)
if (i + 1) % 50000 == 0:
logger.info(f"Processed {i + 1:,} travel records...")
stats["travel_records"] = neo4j_client.batch_create_nodes(
"DepartmentalTravel", travel_data, batch_size
)
logger.info(f"Created {stats['travel_records']:,} DepartmentalTravel nodes")
except Exception as e:
logger.error(f"Could not fetch travel data: {e}")
stats["travel_records"] = 0
# =====================
# HOSPITALITY EXPENSES
# =====================
logger.info("Loading hospitality expenses data...")
try:
hospitality_records = client._load_hospitality()
logger.info(f"Found {len(hospitality_records):,} hospitality records")
hospitality_data = []
for i, record in enumerate(hospitality_records):
# Track organizations
if record.owner_org_title and record.owner_org:
if record.owner_org not in unique_orgs:
unique_orgs[record.owner_org] = {
"id": record.owner_org,
"name": record.owner_org_title,
}
# Create unique ID
hosp_id = f"hospitality-{record.ref_number}"
props = {
"id": hosp_id,
"ref_number": record.ref_number,
"organization": record.owner_org_title,
"organization_code": record.owner_org,
"disclosure_group": record.disclosure_group,
"title_en": record.title_en[:500] if record.title_en else None,
"title_fr": record.title_fr[:500] if record.title_fr else None,
"name": record.name[:200] if record.name else None,
"purpose_en": record.purpose_en[:1000] if record.purpose_en else None,
"purpose_fr": record.purpose_fr[:1000] if record.purpose_fr else None,
"start_date": record.start_date,
"end_date": record.end_date,
"hospitality_year": record.hospitality_year,
"attendees": record.attendees,
"location_en": record.location_en[:500] if record.location_en else None,
"location_fr": record.location_fr[:500] if record.location_fr else None,
"total": record.total,
"updated_at": datetime.utcnow().isoformat(),
}
# Remove None values
props = {k: v for k, v in props.items() if v is not None}
hospitality_data.append(props)
if (i + 1) % 50000 == 0:
logger.info(f"Processed {i + 1:,} hospitality records...")
stats["hospitality_records"] = neo4j_client.batch_create_nodes(
"DepartmentalHospitality", hospitality_data, batch_size
)
logger.info(f"Created {stats['hospitality_records']:,} DepartmentalHospitality nodes")
except Exception as e:
logger.error(f"Could not fetch hospitality data: {e}")
stats["hospitality_records"] = 0
# Create/update Department nodes
logger.info(f"Merging {len(unique_orgs):,} organization nodes...")
for org_data in unique_orgs.values():
try:
neo4j_client.run_query(
"""
MERGE (d:Department {id: $id})
ON CREATE SET d.name = $name
ON MATCH SET d.name = COALESCE(d.name, $name)
""",
org_data
)
except Exception as e:
logger.debug(f"Error creating department {org_data['id']}: {e}")
stats["departments_merged"] = len(unique_orgs)
# Create relationships to Department nodes
logger.info("Creating DepartmentalTravel → Department relationships...")
neo4j_client.run_query("""
MATCH (t:DepartmentalTravel)
WHERE t.organization_code IS NOT NULL
MATCH (d:Department {id: t.organization_code})
MERGE (t)-[:FROM_DEPARTMENT]->(d)
""")
travel_rel_count = neo4j_client.run_query("""
MATCH (:DepartmentalTravel)-[r:FROM_DEPARTMENT]->()
RETURN count(r) as count
""")
stats["travel_dept_relationships"] = travel_rel_count[0]["count"] if travel_rel_count else 0
logger.info(f" ✓ Created {stats['travel_dept_relationships']:,} travel→department relationships")
logger.info("Creating DepartmentalHospitality → Department relationships...")
neo4j_client.run_query("""
MATCH (h:DepartmentalHospitality)
WHERE h.organization_code IS NOT NULL
MATCH (d:Department {id: h.organization_code})
MERGE (h)-[:FROM_DEPARTMENT]->(d)
""")
hosp_rel_count = neo4j_client.run_query("""
MATCH (:DepartmentalHospitality)-[r:FROM_DEPARTMENT]->()
RETURN count(r) as count
""")
stats["hospitality_dept_relationships"] = hosp_rel_count[0]["count"] if hosp_rel_count else 0
logger.info(f" ✓ Created {stats['hospitality_dept_relationships']:,} hospitality→department relationships")
# Create indexes
logger.info("Creating indexes...")
indexes = [
"CREATE INDEX IF NOT EXISTS FOR (n:DepartmentalTravel) ON (n.id)",
"CREATE INDEX IF NOT EXISTS FOR (n:DepartmentalTravel) ON (n.ref_number)",
"CREATE INDEX IF NOT EXISTS FOR (n:DepartmentalTravel) ON (n.organization_code)",
"CREATE INDEX IF NOT EXISTS FOR (n:DepartmentalTravel) ON (n.travel_year)",
"CREATE INDEX IF NOT EXISTS FOR (n:DepartmentalTravel) ON (n.name)",
"CREATE INDEX IF NOT EXISTS FOR (n:DepartmentalTravel) ON (n.total)",
"CREATE INDEX IF NOT EXISTS FOR (n:DepartmentalHospitality) ON (n.id)",
"CREATE INDEX IF NOT EXISTS FOR (n:DepartmentalHospitality) ON (n.ref_number)",
"CREATE INDEX IF NOT EXISTS FOR (n:DepartmentalHospitality) ON (n.organization_code)",
"CREATE INDEX IF NOT EXISTS FOR (n:DepartmentalHospitality) ON (n.hospitality_year)",
"CREATE INDEX IF NOT EXISTS FOR (n:DepartmentalHospitality) ON (n.name)",
"CREATE INDEX IF NOT EXISTS FOR (n:DepartmentalHospitality) ON (n.total)",
"CREATE FULLTEXT INDEX travel_search_fulltext IF NOT EXISTS FOR (n:DepartmentalTravel) ON EACH [n.name, n.destination_en, n.purpose_en]",
"CREATE FULLTEXT INDEX hospitality_search_fulltext IF NOT EXISTS FOR (n:DepartmentalHospitality) ON EACH [n.name, n.location_en, n.purpose_en]",
]
for idx in indexes:
try:
neo4j_client.run_query(idx)
except Exception as e:
logger.debug(f"Index creation note: {e}")
logger.info("✓ Indexes created")
logger.info("=" * 60)
logger.success("✅ DEPARTMENTAL EXPENSES INGESTION COMPLETE")
logger.info(f"Travel Records: {stats.get('travel_records', 0):,}")
logger.info(f"Hospitality Records: {stats.get('hospitality_records', 0):,}")
logger.info(f"Departments Merged: {stats.get('departments_merged', 0):,}")
logger.info(f"Travel→Dept Relationships: {stats.get('travel_dept_relationships', 0):,}")
logger.info(f"Hospitality→Dept Relationships: {stats.get('hospitality_dept_relationships', 0):,}")
logger.info("=" * 60)
return stats