"""Monte Carlo simulation tools for portfolio projections."""
from __future__ import annotations
import logging
import math
from datetime import datetime
from pathlib import Path
from typing import Optional
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from ..database.sqlite_client import SQLiteClient
from ..data.sp500_fetcher import SP500DataFetcher
logger = logging.getLogger(__name__)
class MonteCarloSimulator:
"""Monte Carlo simulation for portfolio projections."""
def __init__(self, db_client: SQLiteClient, reports_path: str):
"""Initialize Monte Carlo simulator.
Args:
db_client: SQLite database client
reports_path: Path to reports directory
"""
self.db = db_client
self.reports_path = Path(reports_path)
async def run_stress_test_simulation(
self,
n_simulations: int = 10000,
projection_years: int = 5,
account_numbers: Optional[list[str]] = None,
percentiles: Optional[list[float]] = None
) -> dict:
"""Run Monte Carlo stress test with multiple adverse scenarios.
Projects portfolio under baseline conditions and multiple stress scenarios:
- Market Crash: Sudden 30% decline
- Bear Market: 12 months of -5% monthly returns
- Volatility Spike: 3x normal volatility for 6 months
Stress events occur at random times during projection (different for each simulation).
Args:
n_simulations: Number of simulation paths (default: 10,000)
projection_years: Years to project (default: 5)
account_numbers: Accounts to include (default: all)
percentiles: Percentiles to calculate (default: [10,25,50,75,90])
Returns:
Dictionary with baseline and stressed projections, comparisons, and visualizations
"""
# Lazy-load heavy dependencies
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
if percentiles is None:
percentiles = [10, 25, 50, 75, 90]
# Load portfolio data
portfolio_data = self._load_portfolio_returns(account_numbers)
if portfolio_data is None or len(portfolio_data) < 12:
return {
"error": "Insufficient historical data. Need at least 12 months of returns.",
"data_points": len(portfolio_data) if portfolio_data is not None else 0
}
# Calculate statistics
stats = self._calculate_statistics(portfolio_data['returns'].values)
initial_value = portfolio_data['ending_balance'].iloc[-1]
n_months = projection_years * 12
# Run baseline simulation
baseline_paths = self._simulate_paths_normal(
initial_value=initial_value,
mean=stats['mean_monthly_return'],
volatility=stats['monthly_volatility'],
n_simulations=n_simulations,
n_months=n_months
)
# Define stress scenarios
stress_scenarios = {
'market_crash': {
'name': 'Market Crash',
'description': '30% immediate decline at random point',
'type': 'crash',
'magnitude': -0.30
},
'bear_market': {
'name': 'Bear Market',
'description': '12 months of -5% monthly returns',
'type': 'bear',
'monthly_return': -0.05,
'duration_months': 12
},
'volatility_spike': {
'name': 'Volatility Spike',
'description': '3x volatility for 6 months',
'type': 'volatility',
'multiplier': 3.0,
'duration_months': 6
}
}
# Run stressed simulations
stressed_results = {}
for scenario_key, scenario_config in stress_scenarios.items():
stressed_paths = self._simulate_paths_with_stress(
initial_value=initial_value,
mean=stats['mean_monthly_return'],
volatility=stats['monthly_volatility'],
n_simulations=n_simulations,
n_months=n_months,
stress_config=scenario_config
)
stressed_results[scenario_key] = {
'config': scenario_config,
'paths': stressed_paths,
'percentiles': self._calculate_percentiles(
simulation_paths=stressed_paths,
percentiles=percentiles,
projection_years=projection_years
)
}
# Calculate baseline percentiles
baseline_percentiles = self._calculate_percentiles(
simulation_paths=baseline_paths,
percentiles=percentiles,
projection_years=projection_years
)
# Generate visualizations
timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S")
viz_paths = self._generate_stress_test_visualizations(
baseline_paths=baseline_paths,
stressed_results=stressed_results,
baseline_percentiles=baseline_percentiles,
initial_value=initial_value,
timestamp=timestamp
)
# Save data files
data_paths = self._save_stress_test_data(
baseline_percentiles=baseline_percentiles,
stressed_results=stressed_results,
stats=stats,
initial_value=initial_value,
n_simulations=n_simulations,
projection_years=projection_years,
timestamp=timestamp
)
# Get unique accounts
all_accounts = []
for accounts_arr in portfolio_data['account_numbers']:
all_accounts.extend(accounts_arr.tolist() if hasattr(accounts_arr, 'tolist') else accounts_arr)
unique_accounts = list(set(all_accounts))
# Compile results
return {
"simulation_metadata": {
"timestamp": datetime.now().isoformat(),
"n_simulations": n_simulations,
"projection_years": projection_years,
"projection_months": n_months,
"accounts_included": unique_accounts,
"initial_portfolio_value": round(initial_value, 2)
},
"input_statistics": {
"historical_periods": stats['n_periods'],
"date_range": f"{portfolio_data['month'].iloc[0]} to {portfolio_data['month'].iloc[-1]}",
"mean_monthly_return": round(stats['mean_monthly_return'], 6),
"monthly_volatility": round(stats['monthly_volatility'], 6),
"annualized_return": round(stats['annualized_return'], 6),
"annualized_volatility": round(stats['annualized_volatility'], 6)
},
"baseline": baseline_percentiles,
"stress_scenarios": {
key: {
"name": val['config']['name'],
"description": val['config']['description'],
"projections": val['percentiles']
}
for key, val in stressed_results.items()
},
"visualizations": viz_paths,
"data_files": data_paths
}
async def run_portfolio_simulation(
self,
n_simulations: int = 10000,
projection_years: int = 5,
account_numbers: Optional[list[str]] = None,
percentiles: Optional[list[float]] = None,
method: str = "normal"
) -> dict:
"""Run Monte Carlo simulation for portfolio projection.
Projects future portfolio value using historical returns.
No contributions or withdrawals assumed during projection period.
Two methods available:
- 'normal': Geometric Brownian Motion with parametric normal distribution
- 'historical': Bootstrap from S&P 500 history (non-parametric)
Args:
n_simulations: Number of simulation paths (default: 10,000)
projection_years: Years to project (default: 5)
account_numbers: Accounts to include (default: all)
percentiles: Percentiles to calculate (default: [10,25,50,75,90])
method: Simulation method - 'normal' or 'historical' (default: 'normal')
Returns:
Dictionary with projections, statistics, and visualization paths
"""
if percentiles is None:
percentiles = [10, 25, 50, 75, 90]
# Load and aggregate portfolio returns
portfolio_data = self._load_portfolio_returns(account_numbers)
if portfolio_data is None or len(portfolio_data) < 12:
return {
"error": "Insufficient historical data. Need at least 12 months of returns.",
"data_points": len(portfolio_data) if portfolio_data is not None else 0
}
# Calculate statistics from historical returns
stats = self._calculate_statistics(portfolio_data['returns'].values)
# Get current portfolio value
initial_value = portfolio_data['ending_balance'].iloc[-1]
# Run simulations (method-specific)
n_months = projection_years * 12
scaling_factors = None
if method == "normal":
simulation_paths = self._simulate_paths_normal(
initial_value=initial_value,
mean=stats['mean_monthly_return'],
volatility=stats['monthly_volatility'],
n_simulations=n_simulations,
n_months=n_months
)
elif method == "historical":
simulation_paths, scaling_factors = await self._simulate_paths_historical(
initial_value=initial_value,
portfolio_stats=stats,
portfolio_returns=portfolio_data['returns'].values,
n_simulations=n_simulations,
n_months=n_months
)
else:
return {"error": f"Invalid method: {method}. Use 'normal' or 'historical'"}
# Calculate percentiles for each year
year_by_year = self._calculate_percentiles(
simulation_paths=simulation_paths,
percentiles=percentiles,
projection_years=projection_years
)
# Generate visualizations and save data
timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S")
viz_paths = self._generate_visualizations(
simulation_paths=simulation_paths,
percentile_data=year_by_year,
initial_value=initial_value,
timestamp=timestamp
)
data_paths = self._save_data_files(
year_by_year=year_by_year,
stats=stats,
initial_value=initial_value,
n_simulations=n_simulations,
projection_years=projection_years,
portfolio_data=portfolio_data,
timestamp=timestamp
)
# Get unique account numbers (flatten the nested arrays)
all_accounts = []
for accounts_arr in portfolio_data['account_numbers']:
all_accounts.extend(accounts_arr.tolist() if hasattr(accounts_arr, 'tolist') else accounts_arr)
unique_accounts = list(set(all_accounts))
# Compile results
result = {
"simulation_metadata": {
"timestamp": datetime.now().isoformat(),
"method": method,
"n_simulations": n_simulations,
"projection_years": projection_years,
"projection_months": n_months,
"accounts_included": unique_accounts,
"initial_portfolio_value": round(initial_value, 2)
},
"input_statistics": {
"historical_periods": stats['n_periods'],
"date_range": f"{portfolio_data['month'].iloc[0]} to {portfolio_data['month'].iloc[-1]}",
"mean_monthly_return": round(stats['mean_monthly_return'], 6),
"monthly_volatility": round(stats['monthly_volatility'], 6),
"annualized_return": round(stats['annualized_return'], 6),
"annualized_volatility": round(stats['annualized_volatility'], 6)
},
"year_by_year_projections": year_by_year,
"visualizations": viz_paths,
"data_files": data_paths
}
# Add scaling factors if historical method
if method == "historical" and scaling_factors is not None:
result["scaling_factors"] = {
"volatility_ratio": round(scaling_factors['volatility_ratio'], 4),
"alpha_monthly": round(scaling_factors['alpha'], 6),
"alpha_annualized": round(scaling_factors['alpha_annualized'], 6),
"sp500_mean_monthly": round(scaling_factors['sp500_mean'], 6),
"sp500_volatility": round(scaling_factors['sp500_vol'], 6),
"sp500_periods": scaling_factors['sp500_periods']
}
return result
def _load_portfolio_returns(self, account_numbers: Optional[list[str]]) -> Optional[pd.DataFrame]:
"""Load monthly_returns.csv and aggregate to portfolio level.
Args:
account_numbers: List of account numbers to include (None for all)
Returns:
DataFrame with portfolio-level monthly returns or None if file not found
"""
import pandas as pd
csv_path = self.reports_path / "monthly_returns.csv"
if not csv_path.exists():
return None
# Read CSV
df = pd.read_csv(csv_path)
# Filter by account numbers if specified
if account_numbers:
df = df[df['Account_Number'].isin(account_numbers)]
# Get unique months and sort them chronologically
unique_months = sorted(df['Month'].unique())
# Group by month and aggregate using Modified Dietz method
monthly_portfolio = []
for month in unique_months:
month_data = df[df['Month'] == month]
# Calculate portfolio-level return using Modified Dietz
total_beginning = month_data['Beginning_Balance_CAD'].sum()
total_ending = month_data['Ending_Balance_CAD'].sum()
total_net_flow = month_data['Net_Cash_Flow_CAD'].sum()
# Modified Dietz: (Ending - Beginning - Net_Flow) / (Beginning + 0.5 * Net_Flow)
denominator = total_beginning + 0.5 * total_net_flow
if denominator != 0:
portfolio_return = (total_ending - total_beginning - total_net_flow) / denominator
else:
portfolio_return = 0.0
monthly_portfolio.append({
'month': month,
'beginning_balance': total_beginning,
'ending_balance': total_ending,
'net_cash_flow': total_net_flow,
'returns': portfolio_return,
'account_numbers': month_data['Account_Number'].unique()
})
return pd.DataFrame(monthly_portfolio)
def _calculate_statistics(self, returns: np.ndarray) -> dict:
"""Calculate mean, volatility, and distribution parameters.
Args:
returns: Array of monthly returns (decimal format)
Returns:
Dictionary of statistical measures
"""
import numpy as np
# Calculate basic statistics
mean_monthly = np.mean(returns)
monthly_vol = np.std(returns, ddof=1) # Sample standard deviation
# Annualize
annualized_return = mean_monthly * 12
annualized_vol = monthly_vol * math.sqrt(12)
return {
'n_periods': len(returns),
'mean_monthly_return': mean_monthly,
'monthly_volatility': monthly_vol,
'annualized_return': annualized_return,
'annualized_volatility': annualized_vol
}
def _simulate_paths_normal(
self,
initial_value: float,
mean: float,
volatility: float,
n_simulations: int,
n_months: int
) -> np.ndarray:
"""Generate simulation paths using Geometric Brownian Motion (parametric).
This is the original implementation using normal distribution assumption.
Args:
initial_value: Starting portfolio value
mean: Mean monthly return
volatility: Monthly volatility (standard deviation)
n_simulations: Number of simulation paths
n_months: Number of months to project
Returns:
Array of shape (n_simulations, n_months + 1) with portfolio values
"""
import numpy as np
# Initialize paths array (including initial value at t=0)
paths = np.zeros((n_simulations, n_months + 1))
paths[:, 0] = initial_value
# Generate random returns for all simulations and time steps
# Shape: (n_simulations, n_months)
random_returns = np.random.normal(mean, volatility, (n_simulations, n_months))
# Apply compound growth
for t in range(n_months):
paths[:, t + 1] = paths[:, t] * (1 + random_returns[:, t])
return paths
async def _simulate_paths_historical(
self,
initial_value: float,
portfolio_stats: dict,
portfolio_returns: np.ndarray,
n_simulations: int,
n_months: int
) -> tuple[np.ndarray, dict]:
"""Generate simulation paths using historical bootstrap method.
Steps:
1. Fetch S&P 500 historical returns (40+ years)
2. Calculate scaling factors:
- volatility_ratio = portfolio_vol / sp500_vol
- alpha = portfolio_mean_return - sp500_mean_return
3. For each simulation:
- Randomly sample from S&P 500 historical returns (with replacement)
- Scale: adjusted_return = (sampled_return × volatility_ratio) + alpha
- Apply to portfolio value
Args:
initial_value: Starting portfolio value
portfolio_stats: Dictionary with portfolio statistics
portfolio_returns: Array of historical portfolio returns
n_simulations: Number of simulation paths
n_months: Number of months to project
Returns:
Tuple of (simulation paths array, scaling factors dictionary)
"""
import numpy as np
# 1. Fetch S&P 500 data
logger.info("Fetching S&P 500 historical data for bootstrap simulation...")
sp500_data = await self.sp500_fetcher.get_sp500_returns(min_years=40)
sp500_returns = sp500_data['return'].values
logger.info(f"Using {len(sp500_returns)} months of S&P 500 history for simulation")
# 2. Calculate scaling factors
scaling_factors = self._calculate_scaling_factors(portfolio_returns, sp500_returns)
# Log scaling factors for diagnostics
logger.info(
f"Scaling factors: volatility_ratio={scaling_factors['volatility_ratio']:.2f}, "
f"alpha={scaling_factors['alpha']*100:.2f}% monthly ({scaling_factors['alpha_annualized']*100:.2f}% annual)"
)
# Warn for extreme values
if scaling_factors['volatility_ratio'] > 5.0 or scaling_factors['volatility_ratio'] < 0.2:
logger.warning(
f"Unusual volatility ratio: {scaling_factors['volatility_ratio']:.2f}. "
f"Portfolio may be extremely different from S&P 500."
)
if abs(scaling_factors['alpha_annualized']) > 0.50:
logger.warning(
f"Extreme alpha detected: {scaling_factors['alpha_annualized']*100:.1f}% annually. "
f"Consider whether portfolio can sustain this."
)
# 3. Initialize paths array
paths = np.zeros((n_simulations, n_months + 1))
paths[:, 0] = initial_value
# 4. Run simulations
for t in range(n_months):
# Randomly sample from S&P 500 historical returns (with replacement)
# Shape: (n_simulations,)
sampled_returns = np.random.choice(sp500_returns, size=n_simulations, replace=True)
# Scale returns by volatility ratio and add alpha
adjusted_returns = (sampled_returns * scaling_factors['volatility_ratio']) + scaling_factors['alpha']
# Apply compound growth
paths[:, t + 1] = paths[:, t] * (1 + adjusted_returns)
return paths, scaling_factors
def _calculate_scaling_factors(
self,
portfolio_returns: np.ndarray,
sp500_returns: np.ndarray
) -> dict:
"""Calculate volatility ratio and alpha for historical scaling.
Args:
portfolio_returns: Array of portfolio monthly returns
sp500_returns: Array of S&P 500 monthly returns
Returns:
Dictionary with scaling factors and diagnostics
"""
import numpy as np
# Calculate portfolio statistics
portfolio_mean = np.mean(portfolio_returns)
portfolio_vol = np.std(portfolio_returns, ddof=1)
# Calculate S&P 500 statistics
sp500_mean = np.mean(sp500_returns)
sp500_vol = np.std(sp500_returns, ddof=1)
# Calculate scaling factors
if sp500_vol != 0:
volatility_ratio = portfolio_vol / sp500_vol
else:
logger.warning("S&P 500 volatility is zero, using volatility_ratio = 1.0")
volatility_ratio = 1.0
alpha = portfolio_mean - sp500_mean
return {
'portfolio_mean': portfolio_mean,
'portfolio_vol': portfolio_vol,
'sp500_mean': sp500_mean,
'sp500_vol': sp500_vol,
'volatility_ratio': volatility_ratio,
'alpha': alpha,
'alpha_annualized': alpha * 12,
'portfolio_periods': len(portfolio_returns),
'sp500_periods': len(sp500_returns)
}
def _calculate_percentiles(
self,
simulation_paths: np.ndarray,
percentiles: list[float],
projection_years: int
) -> list[dict]:
"""Calculate percentile projections for each year.
Args:
simulation_paths: Array of simulation paths
percentiles: List of percentiles to calculate (e.g., [10, 25, 50, 75, 90])
projection_years: Number of years projected
Returns:
List of dictionaries with year-by-year percentile projections
"""
import numpy as np
year_by_year = []
for year in range(1, projection_years + 1):
month_idx = year * 12 # Index in the paths array
year_values = simulation_paths[:, month_idx]
# Calculate percentiles
percentile_dict = {}
for p in percentiles:
percentile_dict[f"p{int(p)}"] = round(np.percentile(year_values, p), 2)
# Calculate statistics
year_by_year.append({
"year": year,
"months": month_idx,
"percentiles": percentile_dict,
"statistics": {
"mean": round(np.mean(year_values), 2),
"std_dev": round(np.std(year_values), 2),
"min": round(np.min(year_values), 2),
"max": round(np.max(year_values), 2)
}
})
return year_by_year
def _generate_visualizations(
self,
simulation_paths: np.ndarray,
percentile_data: list[dict],
initial_value: float,
timestamp: str
) -> dict[str, str]:
"""Create and save visualization charts.
Args:
simulation_paths: Array of simulation paths
percentile_data: Year-by-year percentile projections
initial_value: Starting portfolio value
timestamp: Timestamp string for directory naming
Returns:
Dictionary with paths to generated chart files
"""
# Create output directory
charts_dir = self.reports_path / "monte_carlo" / timestamp
charts_dir.mkdir(parents=True, exist_ok=True)
# Create "latest" directory
latest_dir = self.reports_path / "monte_carlo" / "latest"
latest_dir.mkdir(parents=True, exist_ok=True)
# 1. Fan Chart - Percentile bands over time
self._create_fan_chart(
simulation_paths=simulation_paths,
initial_value=initial_value,
save_path=charts_dir / "fan_chart.png"
)
self._create_fan_chart(
simulation_paths=simulation_paths,
initial_value=initial_value,
save_path=latest_dir / "fan_chart.png"
)
# 2. Distribution Histogram - Final values at end of projection
self._create_distribution_histogram(
final_values=simulation_paths[:, -1],
percentile_data=percentile_data[-1],
save_path=charts_dir / "distribution_histogram.png"
)
self._create_distribution_histogram(
final_values=simulation_paths[:, -1],
percentile_data=percentile_data[-1],
save_path=latest_dir / "distribution_histogram.png"
)
# 3. Confidence Interval Chart - Box plots for each year
self._create_confidence_intervals(
simulation_paths=simulation_paths,
percentile_data=percentile_data,
save_path=charts_dir / "confidence_intervals.png"
)
self._create_confidence_intervals(
simulation_paths=simulation_paths,
percentile_data=percentile_data,
save_path=latest_dir / "confidence_intervals.png"
)
return {
"fan_chart": str(charts_dir / "fan_chart.png"),
"distribution": str(charts_dir / "distribution_histogram.png"),
"confidence_intervals": str(charts_dir / "confidence_intervals.png")
}
def _create_fan_chart(
self,
simulation_paths: np.ndarray,
initial_value: float,
save_path: Path
) -> None:
"""Create fan chart with percentile bands.
Args:
simulation_paths: Array of simulation paths
initial_value: Starting portfolio value
save_path: Path to save the chart
"""
fig, ax = plt.subplots(figsize=(12, 7))
n_months = simulation_paths.shape[1] - 1
months = np.arange(0, n_months + 1)
# Calculate percentiles for each month
p10 = np.percentile(simulation_paths, 10, axis=0)
p25 = np.percentile(simulation_paths, 25, axis=0)
p50 = np.percentile(simulation_paths, 50, axis=0)
p75 = np.percentile(simulation_paths, 75, axis=0)
p90 = np.percentile(simulation_paths, 90, axis=0)
# Plot shaded regions
ax.fill_between(months, p10, p90, alpha=0.2, color='blue', label='10th-90th percentile')
ax.fill_between(months, p25, p75, alpha=0.3, color='blue', label='25th-75th percentile')
# Plot median line
ax.plot(months, p50, color='blue', linewidth=2, label='Median (50th percentile)')
# Plot initial value
ax.axhline(y=initial_value, color='green', linestyle='--', linewidth=1, label='Initial Value')
# Formatting
ax.set_xlabel('Months', fontsize=12)
ax.set_ylabel('Portfolio Value (CAD)', fontsize=12)
ax.set_title('Monte Carlo Simulation - Portfolio Projection Fan Chart', fontsize=14, fontweight='bold')
ax.legend(loc='upper left')
ax.grid(True, alpha=0.3)
# Format y-axis as currency
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'${x:,.0f}'))
plt.tight_layout()
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.close()
def _create_distribution_histogram(
self,
final_values: np.ndarray,
percentile_data: dict,
save_path: Path
) -> None:
"""Create histogram of final portfolio values.
Args:
final_values: Array of final portfolio values from all simulations
percentile_data: Percentile data for the final year
save_path: Path to save the chart
"""
fig, ax = plt.subplots(figsize=(12, 7))
# Create histogram
n, bins, patches = ax.hist(final_values, bins=50, alpha=0.7, color='blue', edgecolor='black')
# Add vertical lines for percentiles
percentiles = percentile_data['percentiles']
colors = {'p10': 'red', 'p25': 'orange', 'p50': 'green', 'p75': 'orange', 'p90': 'red'}
for key, value in percentiles.items():
color = colors.get(key, 'gray')
label = f'{key.upper()}: ${value:,.0f}'
ax.axvline(x=value, color=color, linestyle='--', linewidth=2, label=label)
# Formatting
ax.set_xlabel('Final Portfolio Value (CAD)', fontsize=12)
ax.set_ylabel('Frequency', fontsize=12)
ax.set_title(f'Distribution of Portfolio Values (Year {percentile_data["year"]})', fontsize=14, fontweight='bold')
ax.legend(loc='upper right')
ax.grid(True, alpha=0.3, axis='y')
# Format x-axis as currency
ax.xaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'${x:,.0f}'))
plt.tight_layout()
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.close()
def _create_confidence_intervals(
self,
simulation_paths: np.ndarray,
percentile_data: list[dict],
save_path: Path
) -> None:
"""Create box plot showing confidence intervals for each year.
Args:
simulation_paths: Array of simulation paths
percentile_data: Year-by-year percentile projections
save_path: Path to save the chart
"""
fig, ax = plt.subplots(figsize=(12, 7))
# Extract values for each year
years = [item['year'] for item in percentile_data]
data_by_year = []
for year_data in percentile_data:
month_idx = year_data['months']
year_values = simulation_paths[:, month_idx]
data_by_year.append(year_values)
# Create box plot
bp = ax.boxplot(data_by_year, positions=years, widths=0.6, patch_artist=True,
showmeans=True, meanline=True,
boxprops=dict(facecolor='lightblue', alpha=0.7),
medianprops=dict(color='blue', linewidth=2),
meanprops=dict(color='red', linewidth=2, linestyle='--'),
whiskerprops=dict(linewidth=1.5),
capprops=dict(linewidth=1.5))
# Formatting
ax.set_xlabel('Year', fontsize=12)
ax.set_ylabel('Portfolio Value (CAD)', fontsize=12)
ax.set_title('Portfolio Value Confidence Intervals by Year', fontsize=14, fontweight='bold')
ax.set_xticks(years)
ax.grid(True, alpha=0.3, axis='y')
# Format y-axis as currency
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'${x:,.0f}'))
# Add legend
median_patch = mpatches.Patch(color='blue', label='Median')
mean_patch = mpatches.Patch(color='red', label='Mean')
ax.legend(handles=[median_patch, mean_patch], loc='upper left')
plt.tight_layout()
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.close()
def _save_data_files(
self,
year_by_year: list[dict],
stats: dict,
initial_value: float,
n_simulations: int,
projection_years: int,
portfolio_data: pd.DataFrame,
timestamp: str
) -> dict[str, str]:
"""Save simulation data to CSV files.
Args:
year_by_year: Year-by-year percentile projections
stats: Input statistics
initial_value: Initial portfolio value
n_simulations: Number of simulations
projection_years: Years projected
portfolio_data: Historical portfolio returns
timestamp: Timestamp string for directory naming
Returns:
Dictionary with paths to generated data files
"""
# Create output directory (same as charts)
data_dir = self.reports_path / "monte_carlo" / timestamp
data_dir.mkdir(parents=True, exist_ok=True)
# Create "latest" directory
latest_dir = self.reports_path / "monte_carlo" / "latest"
latest_dir.mkdir(parents=True, exist_ok=True)
# 1. Save year-by-year projections
projections_rows = []
for year_data in year_by_year:
year = year_data['year']
p = year_data['percentiles']
s = year_data['statistics']
projections_rows.append({
'Year': year,
'Months': year_data['months'],
'P10_Percentile': p['p10'],
'P25_Percentile': p['p25'],
'P50_Median': p['p50'],
'P75_Percentile': p['p75'],
'P90_Percentile': p['p90'],
'Mean': s['mean'],
'Std_Dev': s['std_dev'],
'Min': s['min'],
'Max': s['max']
})
projections_df = pd.DataFrame(projections_rows)
projections_path = data_dir / "projections.csv"
projections_df.to_csv(projections_path, index=False)
# Also save to latest
projections_df.to_csv(latest_dir / "projections.csv", index=False)
# 2. Save summary metadata and statistics
summary_rows = [
{'Metric': 'Simulation Date', 'Value': datetime.now().isoformat()},
{'Metric': 'Number of Simulations', 'Value': n_simulations},
{'Metric': 'Projection Years', 'Value': projection_years},
{'Metric': 'Projection Months', 'Value': projection_years * 12},
{'Metric': 'Initial Portfolio Value (CAD)', 'Value': f'${initial_value:,.2f}'},
{'Metric': '', 'Value': ''}, # Blank line
{'Metric': 'Historical Data Points', 'Value': stats['n_periods']},
{'Metric': 'Date Range', 'Value': f"{portfolio_data['month'].iloc[0]} to {portfolio_data['month'].iloc[-1]}"},
{'Metric': 'Mean Monthly Return', 'Value': f"{stats['mean_monthly_return']:.6f} ({stats['mean_monthly_return']*100:.2f}%)"},
{'Metric': 'Monthly Volatility', 'Value': f"{stats['monthly_volatility']:.6f} ({stats['monthly_volatility']*100:.2f}%)"},
{'Metric': 'Annualized Return', 'Value': f"{stats['annualized_return']:.6f} ({stats['annualized_return']*100:.2f}%)"},
{'Metric': 'Annualized Volatility', 'Value': f"{stats['annualized_volatility']:.6f} ({stats['annualized_volatility']*100:.2f}%)"},
]
summary_df = pd.DataFrame(summary_rows)
summary_path = data_dir / "summary.csv"
summary_df.to_csv(summary_path, index=False)
# Also save to latest
summary_df.to_csv(latest_dir / "summary.csv", index=False)
# 3. Save historical returns used for the simulation
historical_rows = []
for _, row in portfolio_data.iterrows():
historical_rows.append({
'Month': row['month'],
'Beginning_Balance_CAD': row['beginning_balance'],
'Ending_Balance_CAD': row['ending_balance'],
'Net_Cash_Flow_CAD': row['net_cash_flow'],
'Return': row['returns'],
'Return_Percent': f"{row['returns']*100:.2f}%"
})
historical_df = pd.DataFrame(historical_rows)
historical_path = data_dir / "historical_returns.csv"
historical_df.to_csv(historical_path, index=False)
# Also save to latest
historical_df.to_csv(latest_dir / "historical_returns.csv", index=False)
return {
"projections": str(projections_path),
"summary": str(summary_path),
"historical_returns": str(historical_path)
}
def _simulate_paths_with_stress(
self,
initial_value: float,
mean: float,
volatility: float,
n_simulations: int,
n_months: int,
stress_config: dict
) -> np.ndarray:
"""Generate simulation paths with stress events at random times.
Args:
initial_value: Starting portfolio value
mean: Mean monthly return
volatility: Monthly volatility
n_simulations: Number of simulation paths
n_months: Number of months to project
stress_config: Stress scenario configuration
Returns:
Array of shape (n_simulations, n_months + 1) with portfolio values
"""
# Initialize paths
paths = np.zeros((n_simulations, n_months + 1))
paths[:, 0] = initial_value
# For each simulation, randomly choose when stress event occurs
for sim in range(n_simulations):
# Random start time for stress event (not in first or last month)
stress_start = np.random.randint(1, max(2, n_months - 1))
# Generate returns for this path
for t in range(n_months):
current_month = t + 1
if stress_config['type'] == 'crash':
# Market crash: sudden drop at stress_start
if current_month == stress_start:
# Apply crash
random_return = stress_config['magnitude']
else:
# Normal return
random_return = np.random.normal(mean, volatility)
elif stress_config['type'] == 'bear':
# Bear market: negative returns for duration_months
duration = stress_config['duration_months']
if stress_start <= current_month < stress_start + duration:
# During bear market
random_return = stress_config['monthly_return']
else:
# Normal return
random_return = np.random.normal(mean, volatility)
elif stress_config['type'] == 'volatility':
# Volatility spike: higher volatility for duration_months
duration = stress_config['duration_months']
if stress_start <= current_month < stress_start + duration:
# During volatility spike
stressed_vol = volatility * stress_config['multiplier']
random_return = np.random.normal(mean, stressed_vol)
else:
# Normal return
random_return = np.random.normal(mean, volatility)
else:
# Unknown stress type, use normal return
random_return = np.random.normal(mean, volatility)
# Apply return
paths[sim, t + 1] = paths[sim, t] * (1 + random_return)
return paths
def _generate_stress_test_visualizations(
self,
baseline_paths: np.ndarray,
stressed_results: dict,
baseline_percentiles: list[dict],
initial_value: float,
timestamp: str
) -> dict[str, str]:
"""Create comparison visualizations for stress test.
Args:
baseline_paths: Baseline simulation paths
stressed_results: Dictionary of stressed simulation results
baseline_percentiles: Baseline percentile data
initial_value: Initial portfolio value
timestamp: Timestamp for directory naming
Returns:
Dictionary with paths to generated charts
"""
# Create output directory
charts_dir = self.reports_path / "stress_test" / timestamp
charts_dir.mkdir(parents=True, exist_ok=True)
latest_dir = self.reports_path / "stress_test" / "latest"
latest_dir.mkdir(parents=True, exist_ok=True)
n_months = baseline_paths.shape[1] - 1
months = np.arange(0, n_months + 1)
# 1. Comparison Fan Chart - All scenarios overlaid
fig, ax = plt.subplots(figsize=(14, 8))
# Baseline median
baseline_p50 = np.percentile(baseline_paths, 50, axis=0)
ax.plot(months, baseline_p50, color='green', linewidth=2, label='Baseline Median')
# Baseline confidence band
baseline_p10 = np.percentile(baseline_paths, 10, axis=0)
baseline_p90 = np.percentile(baseline_paths, 90, axis=0)
ax.fill_between(months, baseline_p10, baseline_p90, alpha=0.2, color='green', label='Baseline 10th-90th percentile')
# Stressed scenarios
colors = {'market_crash': 'red', 'bear_market': 'orange', 'volatility_spike': 'purple'}
for scenario_key, result in stressed_results.items():
paths = result['paths']
p50 = np.percentile(paths, 50, axis=0)
color = colors.get(scenario_key, 'gray')
label = result['config']['name']
ax.plot(months, p50, color=color, linewidth=2, linestyle='--', label=f'{label} Median')
ax.axhline(y=initial_value, color='black', linestyle=':', linewidth=1, label='Initial Value')
ax.set_xlabel('Months', fontsize=12)
ax.set_ylabel('Portfolio Value (CAD)', fontsize=12)
ax.set_title('Stress Test Comparison - All Scenarios', fontsize=14, fontweight='bold')
ax.legend(loc='upper left')
ax.grid(True, alpha=0.3)
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'${x:,.0f}'))
plt.tight_layout()
comparison_path = charts_dir / "comparison_chart.png"
plt.savefig(comparison_path, dpi=300, bbox_inches='tight')
plt.savefig(latest_dir / "comparison_chart.png", dpi=300, bbox_inches='tight')
plt.close()
# 2. Year 5 Distribution Comparison
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
axes = axes.flatten()
# Baseline
ax = axes[0]
final_values = baseline_paths[:, -1]
ax.hist(final_values, bins=50, alpha=0.7, color='green', edgecolor='black')
ax.axvline(x=np.median(final_values), color='darkgreen', linestyle='--', linewidth=2)
ax.set_title('Baseline', fontsize=12, fontweight='bold')
ax.set_xlabel('Final Portfolio Value (CAD)', fontsize=10)
ax.set_ylabel('Frequency', fontsize=10)
ax.xaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'${x/1e6:.1f}M'))
# Stressed scenarios
for idx, (scenario_key, result) in enumerate(stressed_results.items(), 1):
ax = axes[idx]
final_values = result['paths'][:, -1]
color = colors.get(scenario_key, 'gray')
ax.hist(final_values, bins=50, alpha=0.7, color=color, edgecolor='black')
ax.axvline(x=np.median(final_values), color='darkred', linestyle='--', linewidth=2)
ax.set_title(result['config']['name'], fontsize=12, fontweight='bold')
ax.set_xlabel('Final Portfolio Value (CAD)', fontsize=10)
ax.set_ylabel('Frequency', fontsize=10)
ax.xaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'${x/1e6:.1f}M'))
plt.suptitle('Year 5 Portfolio Value Distributions', fontsize=14, fontweight='bold')
plt.tight_layout()
distribution_path = charts_dir / "distributions.png"
plt.savefig(distribution_path, dpi=300, bbox_inches='tight')
plt.savefig(latest_dir / "distributions.png", dpi=300, bbox_inches='tight')
plt.close()
# 3. Impact Summary Bar Chart
fig, ax = plt.subplots(figsize=(12, 7))
scenarios = ['Baseline'] + [result['config']['name'] for result in stressed_results.values()]
year5_medians = [baseline_percentiles[-1]['percentiles']['p50']]
year5_p10 = [baseline_percentiles[-1]['percentiles']['p10']]
for result in stressed_results.values():
year5_medians.append(result['percentiles'][-1]['percentiles']['p50'])
year5_p10.append(result['percentiles'][-1]['percentiles']['p10'])
x = np.arange(len(scenarios))
width = 0.35
bars1 = ax.bar(x - width/2, year5_medians, width, label='Median (50th percentile)', color='blue', alpha=0.7)
bars2 = ax.bar(x + width/2, year5_p10, width, label='Conservative (10th percentile)', color='red', alpha=0.7)
ax.set_xlabel('Scenario', fontsize=12)
ax.set_ylabel('Portfolio Value (CAD)', fontsize=12)
ax.set_title('Year 5 Outcomes by Scenario', fontsize=14, fontweight='bold')
ax.set_xticks(x)
ax.set_xticklabels(scenarios, rotation=15, ha='right')
ax.legend()
ax.grid(True, alpha=0.3, axis='y')
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'${x:,.0f}'))
plt.tight_layout()
impact_path = charts_dir / "impact_summary.png"
plt.savefig(impact_path, dpi=300, bbox_inches='tight')
plt.savefig(latest_dir / "impact_summary.png", dpi=300, bbox_inches='tight')
plt.close()
return {
"comparison_chart": str(comparison_path),
"distributions": str(distribution_path),
"impact_summary": str(impact_path)
}
def _save_stress_test_data(
self,
baseline_percentiles: list[dict],
stressed_results: dict,
stats: dict,
initial_value: float,
n_simulations: int,
projection_years: int,
timestamp: str
) -> dict[str, str]:
"""Save stress test data to CSV files.
Args:
baseline_percentiles: Baseline percentile projections
stressed_results: Stressed simulation results
stats: Input statistics
initial_value: Initial portfolio value
n_simulations: Number of simulations
projection_years: Years projected
timestamp: Timestamp for directory naming
Returns:
Dictionary with paths to generated data files
"""
data_dir = self.reports_path / "stress_test" / timestamp
data_dir.mkdir(parents=True, exist_ok=True)
latest_dir = self.reports_path / "stress_test" / "latest"
latest_dir.mkdir(parents=True, exist_ok=True)
# 1. Comparison table - Year by year for all scenarios
comparison_rows = []
for year_idx in range(projection_years):
baseline_year = baseline_percentiles[year_idx]
row = {
'Year': baseline_year['year'],
'Baseline_P10': baseline_year['percentiles']['p10'],
'Baseline_P50': baseline_year['percentiles']['p50'],
'Baseline_P90': baseline_year['percentiles']['p90']
}
for scenario_key, result in stressed_results.items():
scenario_name = result['config']['name'].replace(' ', '_')
scenario_year = result['percentiles'][year_idx]
row[f'{scenario_name}_P10'] = scenario_year['percentiles']['p10']
row[f'{scenario_name}_P50'] = scenario_year['percentiles']['p50']
row[f'{scenario_name}_P90'] = scenario_year['percentiles']['p90']
comparison_rows.append(row)
comparison_df = pd.DataFrame(comparison_rows)
comparison_path = data_dir / "scenario_comparison.csv"
comparison_df.to_csv(comparison_path, index=False)
comparison_df.to_csv(latest_dir / "scenario_comparison.csv", index=False)
# 2. Impact summary - % decline from baseline
impact_rows = []
for scenario_key, result in stressed_results.items():
for year_idx in range(projection_years):
baseline_p50 = baseline_percentiles[year_idx]['percentiles']['p50']
stressed_p50 = result['percentiles'][year_idx]['percentiles']['p50']
decline_pct = ((stressed_p50 - baseline_p50) / baseline_p50) * 100
impact_rows.append({
'Scenario': result['config']['name'],
'Year': year_idx + 1,
'Baseline_Median': baseline_p50,
'Stressed_Median': stressed_p50,
'Decline_Amount': stressed_p50 - baseline_p50,
'Decline_Percent': round(decline_pct, 2)
})
impact_df = pd.DataFrame(impact_rows)
impact_path = data_dir / "impact_analysis.csv"
impact_df.to_csv(impact_path, index=False)
impact_df.to_csv(latest_dir / "impact_analysis.csv", index=False)
# 3. Summary metadata
summary_rows = [
{'Metric': 'Simulation Date', 'Value': datetime.now().isoformat()},
{'Metric': 'Number of Simulations', 'Value': n_simulations},
{'Metric': 'Projection Years', 'Value': projection_years},
{'Metric': 'Initial Portfolio Value (CAD)', 'Value': f'${initial_value:,.2f}'},
{'Metric': '', 'Value': ''},
{'Metric': 'Mean Monthly Return', 'Value': f"{stats['mean_monthly_return']:.6f} ({stats['mean_monthly_return']*100:.2f}%)"},
{'Metric': 'Monthly Volatility', 'Value': f"{stats['monthly_volatility']:.6f} ({stats['monthly_volatility']*100:.2f}%)"},
{'Metric': '', 'Value': ''},
{'Metric': 'Stress Scenarios', 'Value': ''},
]
for scenario_key, result in stressed_results.items():
summary_rows.append({
'Metric': f" {result['config']['name']}",
'Value': result['config']['description']
})
summary_df = pd.DataFrame(summary_rows)
summary_path = data_dir / "summary.csv"
summary_df.to_csv(summary_path, index=False)
summary_df.to_csv(latest_dir / "summary.csv", index=False)
return {
"scenario_comparison": str(comparison_path),
"impact_analysis": str(impact_path),
"summary": str(summary_path)
}