"""Risk calculation tools for portfolio analysis."""
import math
from datetime import datetime
from typing import Optional
from ..database.sqlite_client import SQLiteClient
class RiskCalculator:
"""Calculate risk metrics from historical performance data."""
def __init__(self, db_client: SQLiteClient):
"""Initialize risk calculator.
Args:
db_client: SQLite database client
"""
self.db = db_client
# Risk-free rate assumption (can be updated)
self.risk_free_rate = 0.04 # 4% annual
async def calculate_risk_metrics(
self,
account_number: str,
period_months: int = 12,
benchmark_returns: Optional[list[float]] = None
) -> dict:
"""Calculate risk metrics for an account.
Requires at least 3 months of historical data for meaningful results.
12 months recommended for annual metrics.
Args:
account_number: Account number
period_months: Number of months to use for calculation
benchmark_returns: Optional list of benchmark monthly returns for beta calculation
Returns:
Dictionary of risk metrics
"""
# Get historical performance data
history = await self.db.get_performance_history(account_number, limit=period_months)
if len(history) < 3:
return {
"error": f"Insufficient data. Need at least 3 months, have {len(history)}",
"account_number": account_number,
}
# Extract monthly returns (using month-to-date returns)
returns = []
balances = []
for record in history:
if record.get("return_mtd") is not None:
returns.append(record["return_mtd"])
if record.get("current_balance_cad") is not None:
balances.append(record["current_balance_cad"])
if len(returns) < 3:
# Try to calculate returns from balances if MTD not available
if len(balances) >= 3:
returns = self._calculate_returns_from_balances(balances)
else:
return {
"error": "Insufficient return data for calculations",
"account_number": account_number,
}
# Calculate metrics
metrics = {
"account_number": account_number,
"calculation_date": datetime.now().isoformat(),
"period_months": len(returns),
"volatility": self._calculate_volatility(returns),
"sharpe_ratio": self._calculate_sharpe_ratio(returns),
"max_drawdown": self._calculate_max_drawdown(balances) if balances else None,
"var_95": self._calculate_var(returns, confidence=0.95),
}
# Calculate beta if benchmark returns provided
if benchmark_returns and len(benchmark_returns) >= len(returns):
metrics["beta"] = self._calculate_beta(returns, benchmark_returns[:len(returns)])
else:
metrics["beta"] = None
# Store metrics in database
await self.db.insert_risk_metrics(metrics)
return metrics
def _calculate_returns_from_balances(self, balances: list[float]) -> list[float]:
"""Calculate monthly returns from balance history.
Args:
balances: List of balances (most recent first)
Returns:
List of monthly returns (percentages)
"""
returns = []
# Reverse to go from oldest to newest
balances_ordered = list(reversed(balances))
for i in range(1, len(balances_ordered)):
if balances_ordered[i - 1] != 0:
monthly_return = (
(balances_ordered[i] - balances_ordered[i - 1])
/ balances_ordered[i - 1] * 100
)
returns.append(monthly_return)
return returns
def _calculate_volatility(self, returns: list[float]) -> float:
"""Calculate annualized volatility (standard deviation of returns).
Args:
returns: List of monthly returns (percentages)
Returns:
Annualized volatility as percentage
"""
if len(returns) < 2:
return 0.0
# Calculate mean
mean_return = sum(returns) / len(returns)
# Calculate variance
variance = sum((r - mean_return) ** 2 for r in returns) / (len(returns) - 1)
# Standard deviation
monthly_vol = math.sqrt(variance)
# Annualize (multiply by sqrt(12) for monthly data)
annualized_vol = monthly_vol * math.sqrt(12)
return round(annualized_vol, 2)
def _calculate_sharpe_ratio(self, returns: list[float]) -> float:
"""Calculate Sharpe ratio.
Sharpe = (Portfolio Return - Risk Free Rate) / Portfolio Volatility
Args:
returns: List of monthly returns (percentages)
Returns:
Sharpe ratio
"""
if len(returns) < 2:
return 0.0
# Calculate annualized return
mean_monthly = sum(returns) / len(returns)
annualized_return = mean_monthly * 12
# Calculate annualized volatility
volatility = self._calculate_volatility(returns)
if volatility == 0:
return 0.0
# Sharpe ratio (risk-free rate is already annual)
sharpe = (annualized_return - self.risk_free_rate * 100) / volatility
return round(sharpe, 2)
def _calculate_max_drawdown(self, balances: list[float]) -> float:
"""Calculate maximum drawdown.
Max drawdown is the largest peak-to-trough decline.
Args:
balances: List of balances (most recent first)
Returns:
Maximum drawdown as percentage (negative number)
"""
if len(balances) < 2:
return 0.0
# Reverse to go chronologically
balances_ordered = list(reversed(balances))
peak = balances_ordered[0]
max_drawdown = 0.0
for balance in balances_ordered:
if balance > peak:
peak = balance
if peak > 0:
drawdown = (balance - peak) / peak * 100
if drawdown < max_drawdown:
max_drawdown = drawdown
return round(max_drawdown, 2)
def _calculate_beta(
self, portfolio_returns: list[float], benchmark_returns: list[float]
) -> float:
"""Calculate portfolio beta vs benchmark.
Beta = Covariance(Portfolio, Benchmark) / Variance(Benchmark)
Args:
portfolio_returns: List of portfolio monthly returns
benchmark_returns: List of benchmark monthly returns
Returns:
Beta coefficient
"""
n = min(len(portfolio_returns), len(benchmark_returns))
if n < 3:
return 1.0 # Default to market beta
port = portfolio_returns[:n]
bench = benchmark_returns[:n]
# Calculate means
port_mean = sum(port) / n
bench_mean = sum(bench) / n
# Calculate covariance and benchmark variance
covariance = sum(
(port[i] - port_mean) * (bench[i] - bench_mean)
for i in range(n)
) / (n - 1)
bench_variance = sum(
(bench[i] - bench_mean) ** 2
for i in range(n)
) / (n - 1)
if bench_variance == 0:
return 1.0
beta = covariance / bench_variance
return round(beta, 2)
def _calculate_var(self, returns: list[float], confidence: float = 0.95) -> float:
"""Calculate Value at Risk using historical method.
VaR represents the maximum expected loss at a given confidence level.
Args:
returns: List of monthly returns (percentages)
confidence: Confidence level (0.95 = 95%)
Returns:
VaR as percentage (negative number)
"""
if len(returns) < 3:
return 0.0
# Sort returns ascending
sorted_returns = sorted(returns)
# Find the return at the (1 - confidence) percentile
index = int(len(sorted_returns) * (1 - confidence))
# VaR is the return at this percentile
var = sorted_returns[index] if index < len(sorted_returns) else sorted_returns[0]
return round(var, 2)
async def get_stored_metrics(self, account_number: str) -> Optional[dict]:
"""Get previously calculated risk metrics.
Args:
account_number: Account number
Returns:
Risk metrics or None
"""
metrics = await self.db.get_risk_metrics(account_number, limit=1)
return metrics[0] if metrics else None