"""Report generation tools."""
import json
import csv
from pathlib import Path
from typing import List, Dict
from datetime import datetime
class ReportGenerator:
"""Generate reports from investment data."""
def __init__(self, json_archive_path: str, reports_path: str):
"""Initialize report generator.
Args:
json_archive_path: Path to JSON archive directory
reports_path: Path to reports output directory
"""
self.json_archive_path = Path(json_archive_path)
self.reports_path = Path(reports_path)
# Ensure reports directory exists
self.reports_path.mkdir(parents=True, exist_ok=True)
def _calculate_cash_flows_from_transactions(
self, transactions: List[Dict]
) -> tuple:
"""Calculate total deposits and withdrawals from transaction list.
Handles different activity types:
- IBKR uses: DEP, WITH
- Scotia uses: DEPOSIT
Returns (deposits_cad, withdrawals_cad) tuple.
"""
deposits_cad = 0.0
withdrawals_cad = 0.0
for txn in transactions:
activity_type = txn.get("activity_type")
currency = txn.get("currency", "CAD")
net_amount = txn.get("net_amount", 0.0)
if activity_type in ("DEP", "DEPOSIT"):
# Deposits - convert to CAD if needed
if currency == "CAD":
deposits_cad += net_amount
elif activity_type == "WITH":
# Withdrawals are negative in the data
if currency == "CAD":
withdrawals_cad += abs(net_amount)
return deposits_cad, withdrawals_cad
def _fetch_statements(self, account_number: str) -> List[Dict]:
"""Fetch all statements for the given account from JSON archive files.
Returns list of dicts with keys:
- account_number
- statement_date
- opening_balance_cad
- current_balance_cad
- deposits_cad
- withdrawals_cad
"""
if not self.json_archive_path.exists():
raise FileNotFoundError(
f"JSON archive directory not found at {self.json_archive_path}"
)
# Find all JSON files for this account
statements = []
for json_file in self.json_archive_path.glob("*.json"):
try:
with open(json_file, "r") as f:
data = json.load(f)
# Check if this statement belongs to the target account
summary = data.get("summary", {})
if summary.get("account_number") != account_number:
continue
# Get deposits and withdrawals
# Try summary fields first (Scotia, Questrade, TD populate these)
# Fall back to calculating from transactions (IBKR doesn't populate these)
deposits_cad = summary.get("deposits_cad", 0.0)
withdrawals_cad = summary.get("withdrawals_cad", 0.0)
if deposits_cad == 0.0 and withdrawals_cad == 0.0:
# Summary fields not populated, calculate from transactions
transactions = data.get("transactions", [])
(
deposits_cad,
withdrawals_cad,
) = self._calculate_cash_flows_from_transactions(transactions)
# Extract required fields
statements.append(
{
"account_number": summary.get("account_number"),
"statement_date": summary.get("statement_date"),
"opening_balance_cad": summary.get("opening_balance_cad", 0.0),
"current_balance_cad": summary.get("current_balance_cad", 0.0),
"deposits_cad": deposits_cad,
"withdrawals_cad": withdrawals_cad,
}
)
except (json.JSONDecodeError, KeyError) as e:
# Skip problematic files silently
continue
# Sort by statement_date
statements.sort(key=lambda x: x["statement_date"])
return statements
def _calculate_monthly_return(self, statement: Dict) -> Dict:
"""Calculate monthly return using Modified Dietz method (mid-period assumption).
Formula:
Net_Cash_Flow = Deposits - Withdrawals
Investment_Gain = Ending_Balance - Opening_Balance - Net_Cash_Flow
Weighted_Base = Opening_Balance + (Net_Cash_Flow × 0.5)
Return = Investment_Gain / Weighted_Base
Returns dict with calculated fields added.
"""
opening = statement["opening_balance_cad"]
ending = statement["current_balance_cad"]
deposits = statement["deposits_cad"]
withdrawals = statement["withdrawals_cad"]
# Calculate net cash flow
net_cash_flow = deposits - withdrawals
# Calculate investment gain (market movement excluding cash flows)
investment_gain = ending - opening - net_cash_flow
# Calculate weighted base (mid-period assumption: cash flows happen at month midpoint)
weighted_base = opening + (net_cash_flow * 0.5)
# Calculate return as decimal (not percentage)
if weighted_base != 0:
return_decimal = investment_gain / weighted_base
else:
# Handle edge case: starting with zero balance
return_decimal = 0.0 if investment_gain == 0 else float("inf")
# Format statement_date as YYYY-MM
statement_date = statement["statement_date"]
if "T" in statement_date:
statement_date = statement_date.split("T")[0] # Remove time portion
month = datetime.fromisoformat(statement_date).strftime("%Y-%m")
return {
"Account_Number": statement["account_number"],
"Month": month,
"Beginning_Balance_CAD": round(opening, 2),
"Ending_Balance_CAD": round(ending, 2),
"Deposits_CAD": round(deposits, 2),
"Withdrawals_CAD": round(withdrawals, 2),
"Net_Cash_Flow_CAD": round(net_cash_flow, 2),
"Investment_Gain_CAD": round(investment_gain, 2),
"Return": (
round(return_decimal, 6) if return_decimal != float("inf") else "N/A"
),
}
async def update_monthly_returns(
self, account_numbers: List[str] = None
) -> dict:
"""Update monthly returns CSV file for all accounts.
Uses Modified Dietz method to calculate time-weighted returns
excluding deposits and withdrawals.
Args:
account_numbers: List of account numbers to include. If None, includes all known accounts.
Returns:
Dictionary with status, file path, and statistics
"""
# Default accounts if none specified
if account_numbers is None:
account_numbers = [
"U4213209", # IBKR Individual
"U4355172", # IBKR Institution Master
"51516162", # Questrade TFSA
"591-94177-16", # Scotia RRSP
"686X42-V", # TD Direct RESP
]
all_results = []
account_stats = {}
for account_number in account_numbers:
statements = self._fetch_statements(account_number)
if not statements:
continue
# Calculate returns for each month
for statement in statements:
result = self._calculate_monthly_return(statement)
all_results.append(result)
# Track statistics
account_stats[account_number] = {
"statements": len(statements),
"date_range": f"{statements[0]['statement_date'][:10]} to {statements[-1]['statement_date'][:10]}",
}
if not all_results:
return {
"status": "error",
"message": "No data found for any accounts",
}
# Sort all results by account and date
results = sorted(all_results, key=lambda x: (x["Account_Number"], x["Month"]))
# Write to CSV
output_file = self.reports_path / "monthly_returns.csv"
fieldnames = [
"Account_Number",
"Month",
"Beginning_Balance_CAD",
"Ending_Balance_CAD",
"Deposits_CAD",
"Withdrawals_CAD",
"Net_Cash_Flow_CAD",
"Investment_Gain_CAD",
"Return",
]
with open(output_file, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
# Calculate statistics per account
for account in sorted(set(r["Account_Number"] for r in results)):
account_results = [r for r in results if r["Account_Number"] == account]
valid_returns = [
r["Return"] for r in account_results if r["Return"] != "N/A"
]
if valid_returns:
avg_return = sum(valid_returns) / len(valid_returns)
min_return = min(valid_returns)
max_return = max(valid_returns)
account_stats[account].update(
{
"avg_monthly_return": round(avg_return * 100, 2),
"min_monthly_return": round(min_return * 100, 2),
"max_monthly_return": round(max_return * 100, 2),
}
)
return {
"status": "success",
"file_path": str(output_file),
"total_rows": len(results),
"accounts": account_stats,
}