"""Portfolio projection tools using Monte Carlo simulation."""
import math
import random
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from ..database.sqlite_client import SQLiteClient
@dataclass
class PortfolioMetrics:
"""Calculated portfolio metrics."""
total_balance: float
annual_returns: list[float] # Historical annual returns (percentages)
# Performance metrics
arithmetic_mean: float # Simple average return
geometric_mean: float # CAGR
median_return: float
# Risk metrics
volatility: float # Standard deviation (annualized)
max_drawdown: float
sharpe_ratio: float
sortino_ratio: float
var_95: float # 95% Value at Risk
cvar_95: float # Conditional VaR (Expected Shortfall)
# Data info
years_of_data: int
calculation_date: str
@dataclass
class ProjectionResult:
"""Monte Carlo projection result."""
initial_value: float
years: int
num_simulations: int
# Percentile outcomes
percentile_10: list[float] # Pessimistic
percentile_25: list[float] # Below median
percentile_50: list[float] # Median
percentile_75: list[float] # Above median
percentile_90: list[float] # Optimistic
# Final values at each percentile
final_p10: float
final_p25: float
final_p50: float
final_p75: float
final_p90: float
# Probability metrics
prob_positive_return: float # Probability of ending higher than start
prob_double: float # Probability of doubling
prob_loss_20pct: float # Probability of losing 20%+
# Expected values
expected_final_value: float
expected_cagr: float
# Input parameters used
mean_return: float
volatility: float
class PortfolioProjector:
"""Calculate portfolio metrics and run Monte Carlo projections."""
def __init__(self, db_client: SQLiteClient):
"""Initialize projector.
Args:
db_client: SQLite database client
"""
self.db = db_client
self.risk_free_rate = 0.04 # 4% annual (current GIC rates)
async def get_portfolio_metrics(self) -> dict:
"""Calculate risk and performance metrics for the combined portfolio.
Aggregates data from all accounts to compute portfolio-level metrics.
Returns:
Dictionary with portfolio metrics
"""
# Get all accounts and their performance history
accounts = await self.db.get_accounts()
if not accounts:
return {"error": "No accounts found"}
# Collect annual returns from all accounts
# We'll use YTD returns from year-end statements to get annual returns
all_annual_returns = []
total_balance = 0.0
for account in accounts:
if account.get("current_balance_cad"):
total_balance += account["current_balance_cad"]
# Get performance history for this account
history = await self.db.get_performance_history(
account["account_number"], limit=50
)
# Extract annual returns from year-end statements
for record in history:
if record.get("return_ytd") is not None:
date_str = record.get("statement_date", "")
# Check if this is a year-end statement (December)
if "12-31" in date_str or date_str.endswith("-12-31"):
all_annual_returns.append({
"year": date_str[:4],
"return": record["return_ytd"],
"account": account["account_number"],
"balance": record.get("current_balance_cad", 0)
})
if len(all_annual_returns) < 3:
return {
"error": f"Insufficient annual data. Need at least 3 years, found {len(all_annual_returns)} records",
"total_balance": total_balance,
}
# Group returns by year and calculate weighted average
returns_by_year = {}
balances_by_year = {}
for r in all_annual_returns:
year = r["year"]
if year not in returns_by_year:
returns_by_year[year] = []
balances_by_year[year] = []
returns_by_year[year].append(r["return"])
balances_by_year[year].append(r["balance"])
# Calculate weighted average return for each year
annual_returns = []
for year in sorted(returns_by_year.keys()):
returns = returns_by_year[year]
balances = balances_by_year[year]
# If we have balance info, weight by balance; otherwise simple average
total_bal = sum(balances)
if total_bal > 0:
weighted_return = sum(r * b for r, b in zip(returns, balances)) / total_bal
else:
weighted_return = sum(returns) / len(returns)
annual_returns.append(weighted_return)
# Calculate metrics
n = len(annual_returns)
# Arithmetic mean
arithmetic_mean = sum(annual_returns) / n
# Geometric mean (CAGR)
# Convert percentages to growth factors, multiply, then convert back
growth_factors = [(1 + r / 100) for r in annual_returns]
geometric_mean = (math.prod(growth_factors) ** (1 / n) - 1) * 100
# Median
sorted_returns = sorted(annual_returns)
if n % 2 == 0:
median_return = (sorted_returns[n // 2 - 1] + sorted_returns[n // 2]) / 2
else:
median_return = sorted_returns[n // 2]
# Volatility (standard deviation)
variance = sum((r - arithmetic_mean) ** 2 for r in annual_returns) / (n - 1)
volatility = math.sqrt(variance)
# Maximum drawdown (using annual data)
max_drawdown = self._calculate_max_drawdown_from_returns(annual_returns)
# Sharpe ratio
excess_return = arithmetic_mean - (self.risk_free_rate * 100)
sharpe_ratio = excess_return / volatility if volatility > 0 else 0
# Sortino ratio (using downside deviation)
downside_returns = [r for r in annual_returns if r < self.risk_free_rate * 100]
if downside_returns:
downside_variance = sum(
(r - self.risk_free_rate * 100) ** 2 for r in downside_returns
) / len(downside_returns)
downside_deviation = math.sqrt(downside_variance)
sortino_ratio = excess_return / downside_deviation if downside_deviation > 0 else 0
else:
sortino_ratio = float('inf') # No downside returns
# VaR and CVaR (95%)
var_95 = self._calculate_var(annual_returns, 0.95)
cvar_95 = self._calculate_cvar(annual_returns, 0.95)
metrics = PortfolioMetrics(
total_balance=total_balance,
annual_returns=annual_returns,
arithmetic_mean=round(arithmetic_mean, 2),
geometric_mean=round(geometric_mean, 2),
median_return=round(median_return, 2),
volatility=round(volatility, 2),
max_drawdown=round(max_drawdown, 2),
sharpe_ratio=round(sharpe_ratio, 2),
sortino_ratio=round(sortino_ratio, 2) if sortino_ratio != float('inf') else None,
var_95=round(var_95, 2),
cvar_95=round(cvar_95, 2),
years_of_data=n,
calculation_date=datetime.now().isoformat(),
)
return {
"total_balance_cad": metrics.total_balance,
"years_of_data": metrics.years_of_data,
"annual_returns": metrics.annual_returns,
"performance": {
"arithmetic_mean": metrics.arithmetic_mean,
"cagr": metrics.geometric_mean,
"median": metrics.median_return,
},
"risk": {
"volatility": metrics.volatility,
"max_drawdown": metrics.max_drawdown,
"sharpe_ratio": metrics.sharpe_ratio,
"sortino_ratio": metrics.sortino_ratio,
"var_95": metrics.var_95,
"cvar_95": metrics.cvar_95,
},
"calculation_date": metrics.calculation_date,
}
async def project_portfolio(
self,
years: int = 10,
num_simulations: int = 10000,
initial_value: Optional[float] = None,
annual_contribution: float = 0.0,
use_geometric_brownian_motion: bool = True,
) -> dict:
"""Run Monte Carlo simulation to project future portfolio values.
Args:
years: Number of years to project
num_simulations: Number of simulation paths
initial_value: Starting value (defaults to current portfolio balance)
annual_contribution: Annual contribution amount (added at year start)
use_geometric_brownian_motion: Use GBM model (True) or simple random sampling (False)
Returns:
Dictionary with projection results and confidence intervals
"""
# Get portfolio metrics first
metrics = await self.get_portfolio_metrics()
if "error" in metrics:
return metrics
# Use current balance if initial value not specified
if initial_value is None:
initial_value = metrics["total_balance_cad"]
mean_return = metrics["performance"]["arithmetic_mean"] / 100
volatility = metrics["risk"]["volatility"] / 100
# Run simulations
all_paths = []
for _ in range(num_simulations):
path = [initial_value]
value = initial_value
for year in range(years):
# Add annual contribution at start of year
value += annual_contribution
if use_geometric_brownian_motion:
# Geometric Brownian Motion
# dS = S * (mu * dt + sigma * dW)
# For annual steps: S_new = S * exp((mu - 0.5*sigma^2) + sigma * Z)
z = random.gauss(0, 1)
drift = mean_return - 0.5 * volatility ** 2
value = value * math.exp(drift + volatility * z)
else:
# Simple random sampling from historical returns
sampled_return = random.gauss(mean_return, volatility)
value = value * (1 + sampled_return)
path.append(max(0, value)) # Prevent negative values
all_paths.append(path)
# Calculate percentiles at each time step
percentile_10 = []
percentile_25 = []
percentile_50 = []
percentile_75 = []
percentile_90 = []
for t in range(years + 1):
values_at_t = sorted([path[t] for path in all_paths])
n = len(values_at_t)
percentile_10.append(values_at_t[int(n * 0.10)])
percentile_25.append(values_at_t[int(n * 0.25)])
percentile_50.append(values_at_t[int(n * 0.50)])
percentile_75.append(values_at_t[int(n * 0.75)])
percentile_90.append(values_at_t[int(n * 0.90)])
# Final values
final_values = [path[-1] for path in all_paths]
sorted_finals = sorted(final_values)
n = len(sorted_finals)
# Calculate probabilities
prob_positive = sum(1 for v in final_values if v > initial_value) / n
prob_double = sum(1 for v in final_values if v > 2 * initial_value) / n
prob_loss_20 = sum(1 for v in final_values if v < 0.8 * initial_value) / n
# Expected values
expected_final = sum(final_values) / n
expected_cagr = ((expected_final / initial_value) ** (1 / years) - 1) * 100
result = {
"initial_value": initial_value,
"years": years,
"num_simulations": num_simulations,
"annual_contribution": annual_contribution,
"model": "Geometric Brownian Motion" if use_geometric_brownian_motion else "Historical Sampling",
"input_parameters": {
"mean_return_pct": round(mean_return * 100, 2),
"volatility_pct": round(volatility * 100, 2),
},
"percentile_paths": {
"p10_pessimistic": [round(v, 2) for v in percentile_10],
"p25": [round(v, 2) for v in percentile_25],
"p50_median": [round(v, 2) for v in percentile_50],
"p75": [round(v, 2) for v in percentile_75],
"p90_optimistic": [round(v, 2) for v in percentile_90],
},
"final_values": {
"p10_pessimistic": round(sorted_finals[int(n * 0.10)], 2),
"p25": round(sorted_finals[int(n * 0.25)], 2),
"p50_median": round(sorted_finals[int(n * 0.50)], 2),
"p75": round(sorted_finals[int(n * 0.75)], 2),
"p90_optimistic": round(sorted_finals[int(n * 0.90)], 2),
"expected_mean": round(expected_final, 2),
},
"probabilities": {
"positive_return": round(prob_positive * 100, 1),
"double_money": round(prob_double * 100, 1),
"lose_20_percent": round(prob_loss_20 * 100, 1),
},
"expected_cagr_pct": round(expected_cagr, 2),
"summary": self._generate_summary(
initial_value, years, sorted_finals, prob_positive, annual_contribution
),
}
return result
def _calculate_max_drawdown_from_returns(self, returns: list[float]) -> float:
"""Calculate maximum drawdown from annual returns.
Args:
returns: List of annual returns (percentages)
Returns:
Maximum drawdown as percentage (negative number)
"""
# Convert returns to cumulative wealth index
wealth = [100] # Start at 100
for r in returns:
wealth.append(wealth[-1] * (1 + r / 100))
peak = wealth[0]
max_drawdown = 0
for w in wealth:
if w > peak:
peak = w
drawdown = (w - peak) / peak * 100
if drawdown < max_drawdown:
max_drawdown = drawdown
return max_drawdown
def _calculate_var(self, returns: list[float], confidence: float) -> float:
"""Calculate Value at Risk using historical method.
Args:
returns: List of returns (percentages)
confidence: Confidence level (0.95 = 95%)
Returns:
VaR as percentage (typically negative)
"""
sorted_returns = sorted(returns)
index = int(len(sorted_returns) * (1 - confidence))
return sorted_returns[index] if index < len(sorted_returns) else sorted_returns[0]
def _calculate_cvar(self, returns: list[float], confidence: float) -> float:
"""Calculate Conditional VaR (Expected Shortfall).
Average of returns worse than VaR.
Args:
returns: List of returns (percentages)
confidence: Confidence level (0.95 = 95%)
Returns:
CVaR as percentage (typically negative)
"""
var = self._calculate_var(returns, confidence)
worse_returns = [r for r in returns if r <= var]
return sum(worse_returns) / len(worse_returns) if worse_returns else var
def _generate_summary(
self,
initial: float,
years: int,
sorted_finals: list[float],
prob_positive: float,
annual_contribution: float,
) -> str:
"""Generate human-readable projection summary.
Args:
initial: Initial portfolio value
years: Number of years projected
sorted_finals: Sorted final values from simulations
prob_positive: Probability of positive return
annual_contribution: Annual contribution amount
Returns:
Summary string
"""
n = len(sorted_finals)
p10 = sorted_finals[int(n * 0.10)]
p50 = sorted_finals[int(n * 0.50)]
p90 = sorted_finals[int(n * 0.90)]
def fmt(v):
if v >= 1_000_000:
return f"${v/1_000_000:.2f}M"
return f"${v:,.0f}"
contrib_text = ""
if annual_contribution > 0:
total_contrib = annual_contribution * years
contrib_text = f" (plus ${total_contrib:,.0f} in contributions)"
summary = f"""In {years} years, starting from {fmt(initial)}{contrib_text}:
- Pessimistic (10th %ile): {fmt(p10)}
- Median (50th %ile): {fmt(p50)}
- Optimistic (90th %ile): {fmt(p90)}
There is a {prob_positive*100:.0f}% probability of ending with more than you started."""
return summary