Skip to main content
Glama
emerzon

MetaTrader5 MCP Server

by emerzon
volatility.py62.6 kB
from typing import Any, Dict, Optional, List, Literal from datetime import datetime import numpy as np import pandas as pd import MetaTrader5 as mt5 import json import math from ..core.constants import TIMEFRAME_MAP, TIMEFRAME_SECONDS from ..core.schema import TimeframeLiteral, DenoiseSpec from ..utils.mt5 import _mt5_epoch_to_utc, _mt5_copy_rates_from, _ensure_symbol_ready from ..utils.utils import _parse_start_datetime as _parse_start_datetime_util from ..utils.denoise import _apply_denoise, normalize_denoise_spec as _normalize_denoise_spec from .common import default_seasonality as _default_seasonality_period, pd_freq_from_timeframe as _pd_freq_from_timeframe # Optional availability flags (match server discovery) try: from statsmodels.tsa.holtwinters import ExponentialSmoothing as _ETS # type: ignore _SM_ETS_AVAILABLE = True except Exception: _SM_ETS_AVAILABLE = False try: from statsmodels.tsa.statespace.sarimax import SARIMAX as _SARIMAX # type: ignore _SM_SARIMAX_AVAILABLE = True except Exception: _SM_SARIMAX_AVAILABLE = False try: import importlib.util as _importlib_util _NF_AVAILABLE = _importlib_util.find_spec("neuralforecast") is not None _MLF_AVAILABLE = _importlib_util.find_spec("mlforecast") is not None except Exception: _NF_AVAILABLE = False _MLF_AVAILABLE = False try: from arch import arch_model as _arch_model # type: ignore _ARCH_AVAILABLE = True except Exception: _ARCH_AVAILABLE = False # Use shared helpers def get_volatility_methods_data() -> Dict[str, Any]: """Return metadata about available volatility forecasting methods and their parameters.""" methods: List[Dict[str, Any]] = [] methods.append({ "method": "ewma", "available": True, "requires": [], "description": "Exponentially weighted moving variance (RiskMetrics-style).", "params": [ {"name": "lookback", "type": "int", "default": 1500, "description": "Number of past returns used in the EWMA."}, {"name": "lambda_", "type": "float", "default": 0.94, "description": "Decay factor for the EWMA weights."}, {"name": "halflife", "type": "int", "default": None, "description": "Optional half-life (in bars) used to derive lambda; overrides lambda_ when provided."}, ], }) for name, desc in ( ("parkinson", "Parkinson high-low range estimator."), ("gk", "Garman-Klass OHLC estimator."), ("rs", "Rogers-Satchell OHLC estimator."), ("yang_zhang", "Yang-Zhang estimator combining overnight jumps and ranges."), ("rolling_std", "Rolling standard deviation of simple returns."), ): methods.append({ "method": name, "available": True, "requires": [], "description": desc, "params": [ {"name": "window", "type": "int", "default": 20, "description": "Number of bars in the rolling window."}, ], }) methods.append({ "method": "realized_kernel", "available": True, "requires": [], "description": "Realized kernel variance with configurable kernel and bandwidth.", "params": [ {"name": "window", "type": "int", "default": 50, "description": "Number of bars of returns fed to the kernel."}, {"name": "kernel", "type": "str", "default": "tukey_hanning", "description": "Kernel name (tukey_hanning, bartlett, parzen, triangular)."}, {"name": "bandwidth", "type": "int", "default": None, "description": "Optional kernel bandwidth; auto-selected when omitted."}, ], }) methods.append({ "method": "har_rv", "available": True, "requires": [], "description": "HAR-RV model on realized variance aggregated from intraday bars.", "params": [ {"name": "rv_timeframe", "type": "str", "default": "M5", "description": "Timeframe used to build intraday realized variance."}, {"name": "days", "type": "int", "default": 120, "description": "Number of calendar days fetched for the HAR fit."}, {"name": "window_w", "type": "int", "default": 5, "description": "Weekly window size for HAR lags."}, {"name": "window_m", "type": "int", "default": 22, "description": "Monthly window size for HAR lags."}, ], }) def _garch_entry(name: str, base_desc: str, dist_default: str = "normal") -> Dict[str, Any]: params = [ {"name": "fit_bars", "type": "int", "default": 2000, "description": "Number of recent returns used to fit the ARCH model."}, {"name": "p", "type": "int", "default": 1, "description": "ARCH order (p)."}, {"name": "q", "type": "int", "default": 1, "description": "GARCH order (q)."}, {"name": "mean", "type": "str", "default": "Zero", "description": "Mean model ('Zero' or 'Constant')."}, {"name": "dist", "type": "str", "default": dist_default, "description": "Innovation distribution (normal, studentst, skewt, etc.)."}, ] if "gjr" in name: params.append({"name": "o", "type": "int", "default": 1, "description": "Asymmetry (leverage) order for GJR-GARCH."}) return { "method": name, "available": _ARCH_AVAILABLE, "requires": [] if _ARCH_AVAILABLE else ["arch"], "description": base_desc, "params": params, } methods.extend([ _garch_entry("garch", "GARCH volatility model (ARCH package)."), _garch_entry("egarch", "Exponential GARCH volatility model."), _garch_entry("gjr_garch", "GJR-GARCH with leverage effects."), _garch_entry("garch_t", "GARCH with Student-t innovations.", dist_default="studentst"), _garch_entry("egarch_t", "EGARCH with Student-t innovations.", dist_default="studentst"), _garch_entry("gjr_garch_t", "GJR-GARCH with Student-t innovations.", dist_default="studentst"), _garch_entry("figarch", "Fractionally integrated FIGARCH volatility model."), ]) methods.append({ "method": "arima", "available": _SM_SARIMAX_AVAILABLE, "requires": [] if _SM_SARIMAX_AVAILABLE else ["statsmodels"], "description": "ARIMA model fitted to the volatility proxy series.", "params": [ {"name": "p", "type": "int", "default": 1, "description": "Non-seasonal AR order."}, {"name": "d", "type": "int", "default": 0, "description": "Non-seasonal differencing order."}, {"name": "q", "type": "int", "default": 1, "description": "Non-seasonal MA order."}, ], }) methods.append({ "method": "sarima", "available": _SM_SARIMAX_AVAILABLE, "requires": [] if _SM_SARIMAX_AVAILABLE else ["statsmodels"], "description": "Seasonal ARIMA on the volatility proxy with automatic seasonal period by timeframe.", "params": [ {"name": "p", "type": "int", "default": 1, "description": "AR order."}, {"name": "d", "type": "int", "default": 0, "description": "Differencing order."}, {"name": "q", "type": "int", "default": 1, "description": "MA order."}, {"name": "P", "type": "int", "default": 0, "description": "Seasonal AR order."}, {"name": "D", "type": "int", "default": 0, "description": "Seasonal differencing order."}, {"name": "Q", "type": "int", "default": 0, "description": "Seasonal MA order."}, ], }) methods.append({ "method": "ets", "available": _SM_ETS_AVAILABLE, "requires": [] if _SM_ETS_AVAILABLE else ["statsmodels"], "description": "Exponential smoothing (ETS) on the volatility proxy.", "params": [], }) methods.append({ "method": "theta", "available": True, "requires": [], "description": "Theta method applied to the volatility proxy.", "params": [ {"name": "alpha", "type": "float", "default": 0.2, "description": "Level smoothing coefficient."}, ], }) methods.append({ "method": "mlf_rf", "available": _MLF_AVAILABLE, "requires": [] if _MLF_AVAILABLE else ["mlforecast", "scikit-learn"], "description": "Random forest regression on lagged volatility proxy features (mlforecast).", "params": [ {"name": "lags", "type": "list[int]", "default": [1, 2, 3, 4, 5], "description": "Autoregressive lags supplied to the regressor."}, {"name": "n_estimators", "type": "int", "default": 200, "description": "Number of trees in the random forest."}, ], }) methods.append({ "method": "nhits", "available": _NF_AVAILABLE, "requires": [] if _NF_AVAILABLE else ["neuralforecast[torch]"], "description": "NeuralForecast NHITS model on the volatility proxy.", "params": [ {"name": "max_epochs", "type": "int", "default": 30, "description": "Training epochs for NHITS."}, {"name": "batch_size", "type": "int", "default": 32, "description": "Mini-batch size."}, {"name": "input_size", "type": "int", "default": None, "description": "Lookback window (auto when omitted)."}, ], }) methods.append({ "method": "ensemble", "available": True, "requires": [], "description": "Blend of multiple price/return forecast methods applied to the volatility proxy.", "params": [ {"name": "methods", "type": "list[str]", "default": [], "description": "Base forecast methods to blend (leave blank for defaults)."}, {"name": "aggregator", "type": "str", "default": "mean", "description": "Aggregation strategy: mean, median, weighted."}, {"name": "weights", "type": "list[float]", "default": [], "description": "Optional weights for the weighted aggregator."}, {"name": "expose_components", "type": "bool", "default": True, "description": "Expose individual component forecasts in the response."}, ], }) return {"methods": methods} # Use shared helpers from common.py for seasonality and pandas freq mapping def _bars_per_year(timeframe: str) -> int: """Approximate number of bars per year for a given timeframe. Uses 365 days; for intraday frames computes 365*24*3600 / seconds_per_bar. """ try: secs = TIMEFRAME_SECONDS.get(timeframe) if not secs or secs <= 0: return 0 return int(round((365.0 * 24.0 * 3600.0) / float(secs))) except Exception: return 0 # --- Range-based variance helpers ------------------------------------------------- def _parkinson_sigma_sq(high: np.ndarray, low: np.ndarray) -> np.ndarray: eps = 1e-12 h = np.asarray(high, dtype=float) l = np.asarray(low, dtype=float) with np.errstate(divide='ignore', invalid='ignore'): x = np.log(np.maximum(h, eps)) - np.log(np.maximum(l, eps)) const = 1.0 / (4.0 * math.log(2.0)) v = const * (x * x) v[~np.isfinite(v)] = np.nan return np.maximum(v, 0.0) def _garman_klass_sigma_sq(open_: np.ndarray, high: np.ndarray, low: np.ndarray, close: np.ndarray) -> np.ndarray: eps = 1e-12 o = np.asarray(open_, dtype=float) h = np.asarray(high, dtype=float) l = np.asarray(low, dtype=float) c = np.asarray(close, dtype=float) with np.errstate(divide='ignore', invalid='ignore'): hl = np.log(np.maximum(h, eps)) - np.log(np.maximum(l, eps)) co = np.log(np.maximum(c, eps)) - np.log(np.maximum(o, eps)) v = 0.5 * (hl * hl) - (2.0 * math.log(2.0) - 1.0) * (co * co) v[~np.isfinite(v)] = np.nan return np.maximum(v, 0.0) def _rogers_satchell_sigma_sq(open_: np.ndarray, high: np.ndarray, low: np.ndarray, close: np.ndarray) -> np.ndarray: eps = 1e-12 o = np.asarray(open_, dtype=float) h = np.asarray(high, dtype=float) l = np.asarray(low, dtype=float) c = np.asarray(close, dtype=float) with np.errstate(divide='ignore', invalid='ignore'): term1 = (np.log(np.maximum(h, eps)) - np.log(np.maximum(c, eps))) * (np.log(np.maximum(h, eps)) - np.log(np.maximum(o, eps))) term2 = (np.log(np.maximum(l, eps)) - np.log(np.maximum(c, eps))) * (np.log(np.maximum(l, eps)) - np.log(np.maximum(o, eps))) rs = term1 + term2 rs[~np.isfinite(rs)] = np.nan return np.maximum(rs, 0.0) def _kernel_weight(kind: str, h: int, bandwidth: int) -> float: if bandwidth <= 0: return 0.0 x = float(h) / float(bandwidth + 1) x = max(0.0, min(1.0, x)) k = kind.lower() if k in {"bartlett", "triangular"}: return float(1.0 - x) if k in {"parzen", "parzen_bartlett"}: if x <= 0.5: return float(1.0 - 6.0 * x * x + 6.0 * x * x * x) if x <= 1.0: return float(2.0 * (1.0 - x) ** 3) return 0.0 # Tukey-Hanning (default) return float(0.5 * (1.0 + math.cos(math.pi * x))) if x <= 1.0 else 0.0 def _realized_kernel_variance( returns: np.ndarray, bandwidth: Optional[int] = None, kernel: str = "tukey_hanning", ) -> float: """Compute realized kernel variance estimate for a return series.""" r = np.asarray(returns, dtype=float) r = r[np.isfinite(r)] n = int(r.size) if n < 3: return float('nan') if bandwidth is None: bandwidth = max(1, int(np.floor(np.sqrt(n)))) bandwidth = int(max(1, min(bandwidth, n - 1))) r_centered = r - float(np.mean(r)) gamma0 = float(np.dot(r_centered, r_centered)) rk = gamma0 for h in range(1, bandwidth + 1): cov = float(np.dot(r_centered[h:], r_centered[:-h])) weight = _kernel_weight(kernel, h, bandwidth) rk += 2.0 * weight * cov rk = max(rk, 0.0) return float(rk / max(1, n)) def forecast_volatility( symbol: str, timeframe: TimeframeLiteral = "H1", horizon: int = 1, method: Literal['ewma','parkinson','gk','rs','yang_zhang','rolling_std','realized_kernel','har_rv','garch','egarch','gjr_garch','garch_t','egarch_t','gjr_garch_t','figarch','arima','sarima','ets','theta'] = 'ewma', # type: ignore proxy: Optional[Literal['squared_return','abs_return','log_r2']] = None, # type: ignore params: Optional[Dict[str, Any]] = None, as_of: Optional[str] = None, denoise: Optional[DenoiseSpec] = None, ) -> Dict[str, Any]: """Forecast volatility over `horizon` bars with direct estimators/GARCH or general forecasters on a proxy. Direct: ewma, parkinson, gk, rs, yang_zhang, rolling_std, realized_kernel, har_rv, garch(+variants). General: arima, sarima, ets, theta (require `proxy`: squared_return|abs_return|log_r2). """ try: if timeframe not in TIMEFRAME_MAP: return {"error": f"Invalid timeframe: {timeframe}. Valid options: {list(TIMEFRAME_MAP.keys())}"} mt5_tf = TIMEFRAME_MAP[timeframe] tf_secs = TIMEFRAME_SECONDS.get(timeframe) if not tf_secs: return {"error": f"Unsupported timeframe seconds for {timeframe}"} method_l = str(method).lower().strip() garch_family = {'garch','egarch','gjr_garch','garch_t','egarch_t','gjr_garch_t','figarch'} valid_direct = {'ewma','parkinson','gk','rs','yang_zhang','rolling_std','realized_kernel','har_rv'} | garch_family valid_general = {'arima','sarima','ets','theta'} if method_l not in valid_direct.union(valid_general): return {"error": f"Invalid method: {method}"} if method_l in garch_family and not _ARCH_AVAILABLE: return {"error": f"{method_l} requires 'arch' package."} # Parse method params: accept dict, JSON string, or k=v pairs __stage = 'parse_params' if isinstance(params, dict): p = dict(params) elif isinstance(params, str): s = params.strip() if (s.startswith('{') and s.endswith('}')): try: p = json.loads(s) except Exception: # Fallback to colon or equals pairs within braces p = {} toks = [tok for tok in s.strip().strip('{}').split() if tok] i = 0 while i < len(toks): tok = toks[i].strip().strip(',') if not tok: i += 1; continue if '=' in tok: k, v = tok.split('=', 1) p[k.strip()] = v.strip().strip(',') i += 1; continue if tok.endswith(':'): key = tok[:-1].strip() val = '' if i + 1 < len(toks): val = toks[i+1].strip().strip(',') i += 2 else: i += 1 p[key] = val continue i += 1 else: # Parse simple k=v pairs separated by comma/space p = {} for tok in s.split(): t = tok.strip().strip(',') if '=' in t: k, v = t.split('=', 1) p[k.strip()] = v.strip() # ignore stray tokens without '=' else: p = {} # If using general forecasters on proxy, compute proxy series and return using internal logic if method_l in valid_general: # Fetch recent closes and build returns # Reuse unified forecast branch for fetching by delegating to data_fetch_candles/forecast_generate where possible is heavy; implement lightweight here # Determine lookback bars need = max(300, int(horizon) + 50) _info_before = mt5.symbol_info(symbol) _was_visible = bool(_info_before.visible) if _info_before is not None else None err = _ensure_symbol_ready(symbol) if err: return {"error": err} try: if as_of: to_dt = _parse_start_datetime_util(as_of) if not to_dt: return {"error": "Invalid as_of time."} rates = _mt5_copy_rates_from(symbol, mt5_tf, to_dt, need) else: _tick = mt5.symbol_info_tick(symbol) if _tick is not None and getattr(_tick, 'time', None): t_utc = _mt5_epoch_to_utc(float(_tick.time)) server_now_dt = datetime.utcfromtimestamp(t_utc) else: server_now_dt = datetime.utcnow() rates = _mt5_copy_rates_from(symbol, mt5_tf, server_now_dt, need) finally: if _was_visible is False: try: mt5.symbol_select(symbol, False) except Exception: pass if rates is None or len(rates) < 5: return {"error": f"Failed to get sufficient rates for {symbol}: {mt5.last_error()}"} df = pd.DataFrame(rates) if as_of is None and len(df) >= 2: df = df.iloc[:-1] if len(df) < 5: return {"error": "Not enough closed bars"} if denoise: _apply_denoise(df, denoise, default_when='pre_ti') with np.errstate(divide='ignore', invalid='ignore'): r = np.diff(np.log(np.maximum(df['close'].astype(float).to_numpy(), 1e-12))) r = r[np.isfinite(r)] if r.size < 10: return {"error": "Insufficient returns to estimate volatility proxy"} # Build proxy if not proxy: return {"error": "General methods require 'proxy' (squared_return|abs_return|log_r2)"} proxy_l = str(proxy).lower().strip() eps = 1e-12 if proxy_l == 'squared_return': y = r * r; back = 'sqrt' elif proxy_l == 'abs_return': y = np.abs(r); back = 'abs' elif proxy_l == 'log_r2': y = np.log(r * r + eps); back = 'exp_sqrt' else: return {"error": f"Unsupported proxy: {proxy}"} y = y[np.isfinite(y)] fh = int(horizon) # Fit general model if method_l in {'arima','sarima'}: if not _SM_SARIMAX_AVAILABLE: return {"error": "ARIMA/SARIMA require statsmodels"} ord_p = int(p.get('p',1)); ord_d = int(p.get('d',0)); ord_q = int(p.get('q',1)) if method_l == 'sarima': m = _default_seasonality_period(timeframe) seas = (int(p.get('P',0)), int(p.get('D',0)), int(p.get('Q',0)), int(m) if m>=2 else 0) else: seas = (0,0,0,0) try: endog = pd.Series(y.astype(float)) model = _SARIMAX(endog, order=(ord_p,ord_d,ord_q), seasonal_order=seas, enforce_stationarity=True, enforce_invertibility=True) res = model.fit(method='lbfgs', disp=False, maxiter=100) yhat = res.get_forecast(steps=fh).predicted_mean.to_numpy() except Exception as ex: return {"error": f"SARIMAX error: {ex}"} elif method_l == 'ets': if not _SM_ETS_AVAILABLE: return {"error": "ETS requires statsmodels"} try: res = _ETS(y.astype(float), trend=None, seasonal=None, initialization_method='heuristic').fit(optimized=True) yhat = np.asarray(res.forecast(fh), dtype=float) except Exception as ex: return {"error": f"ETS error: {ex}"} elif method_l == 'theta': # theta on proxy yy = y.astype(float); n=yy.size; tt=np.arange(1,n+1,dtype=float) A=np.vstack([np.ones(n),tt]).T; coef,_a,_b,_c = np.linalg.lstsq(A, yy, rcond=None); a=float(coef[0]); b=float(coef[1]) trend_future = a + b * (tt[-1] + np.arange(1, fh+1, dtype=float)) alpha = float(p.get('alpha', 0.2)); level=float(yy[0]) for v in yy[1:]: level = alpha*float(v) + (1.0-alpha)*level yhat = 0.5*(trend_future + np.full(fh, level, dtype=float)) elif method_l == 'mlf_rf': if not _MLF_AVAILABLE: return {"error": "mlf_rf requires 'mlforecast' and 'scikit-learn'"} try: from mlforecast import MLForecast as _MLForecast # type: ignore from sklearn.ensemble import RandomForestRegressor as _RF # type: ignore import pandas as _pd except Exception as ex: return {"error": f"Failed to import mlforecast/sklearn: {ex}"} try: ts = _pd.to_datetime(df['time'].iloc[1:].astype(float), unit='s', utc=True) if r.size == (len(df)-1) else _pd.date_range(periods=len(y), freq=_pd_freq_from_timeframe(timeframe)) except Exception: import pandas as _pd ts = _pd.date_range(periods=len(y), freq=_pd_freq_from_timeframe(timeframe)) Y_df = _pd.DataFrame({'unique_id': ['ts']*int(len(y)), 'ds': _pd.Index(ts).to_pydatetime(), 'y': y.astype(float)}) lags = p.get('lags') or [1,2,3,4,5] try: lags = [int(v) for v in lags] except Exception: lags = [1,2,3,4,5] rf = _RF(n_estimators=int(p.get('n_estimators', 200)), random_state=42) try: mlf = _MLForecast(models=[rf], freq=_pd_freq_from_timeframe(timeframe)).add_lags(lags) mlf.fit(Y_df) Yf = mlf.predict(h=int(fh)) try: Yf = Yf[Yf['unique_id']=='ts'] except Exception: pass yhat = np.asarray((Yf['y'] if 'y' in Yf.columns else Yf.iloc[:, -1]).to_numpy(), dtype=float) except Exception as ex: return {"error": f"mlf_rf error: {ex}"} elif method_l == 'nhits': if not _NF_AVAILABLE: return {"error": "nhits requires 'neuralforecast[torch]'"} try: from neuralforecast import NeuralForecast as _NeuralForecast # type: ignore from neuralforecast.models import NHITS as _NF_NHITS # type: ignore import pandas as _pd except Exception as ex: return {"error": f"Failed to import neuralforecast: {ex}"} max_epochs = int(p.get('max_epochs', 30)) batch_size = int(p.get('batch_size', 32)) if p.get('input_size') is not None: input_size = int(p['input_size']) else: base = max(64, 96) input_size = int(min(len(y), base)) try: ts = _pd.to_datetime(df['time'].iloc[1:].astype(float), unit='s', utc=True) if r.size == (len(df)-1) else _pd.date_range(periods=len(y), freq=_pd_freq_from_timeframe(timeframe)) except Exception: import pandas as _pd ts = _pd.date_range(periods=len(y), freq=_pd_freq_from_timeframe(timeframe)) Y_df = _pd.DataFrame({'unique_id': ['ts']*int(len(y)), 'ds': _pd.Index(ts).to_pydatetime(), 'y': y.astype(float)}) model = _NF_NHITS(h=int(fh), input_size=int(input_size), max_epochs=int(max_epochs), batch_size=int(batch_size)) try: nf = _NeuralForecast(models=[model], freq=_pd_freq_from_timeframe(timeframe)) nf.fit(df=Y_df, verbose=False) Yf = nf.predict() try: Yf = Yf[Yf['unique_id']=='ts'] except Exception: pass pred_col = None for c in list(Yf.columns): if c not in ('unique_id','ds','y'): pred_col = c if c == 'y_hat': break if pred_col is None: return {"error": "nhits prediction columns not found"} yhat = np.asarray(Yf[pred_col].to_numpy(), dtype=float) except Exception as ex: return {"error": f"nhits error: {ex}"} else: return {"error": f"Unsupported general method for volatility proxy: {method_l}"} # Back-transform to per-step sigma and aggregate horizon if back == 'sqrt': sig = np.sqrt(np.clip(yhat, 0.0, None)) elif back == 'abs': sig = np.maximum(0.0, yhat) * math.sqrt(math.pi/2.0) else: sig = np.sqrt(np.exp(yhat)) hsig = float(math.sqrt(np.sum(sig[:fh]**2))) # Current sigma (baseline) sbar = float(np.std(r[-100:], ddof=0) if r.size>=5 else np.std(r, ddof=0)) bpy = float(365.0*24.0*3600.0/float(tf_secs)) return {"success": True, "symbol": symbol, "timeframe": timeframe, "method": method_l, "proxy": proxy_l, "horizon": int(horizon), "sigma_bar_return": sbar, "sigma_annual_return": float(sbar*math.sqrt(bpy)), "horizon_sigma_return": hsig, "horizon_sigma_annual": float(hsig*math.sqrt(bpy/max(1,int(horizon)))), "params_used": p} # Direct volatility methods # Fetch history sized by method def _need_bars_direct() -> int: if method_l == 'ewma': lb = int(p.get('lookback', 1500)); return max(lb + 5, int(horizon) + 5) if method_l in {'parkinson','gk','rs','yang_zhang','rolling_std','realized_kernel'}: w = int(p.get('window', 20)); return max(w + int(horizon) + 10, 60) if method_l in garch_family: fb = int(p.get('fit_bars', 2000)); return max(fb + 10, int(horizon) + 10) return max(300, int(horizon) + 50) need = _need_bars_direct() _info_before = mt5.symbol_info(symbol) _was_visible = bool(_info_before.visible) if _info_before is not None else None err = _ensure_symbol_ready(symbol) if err: return {"error": err} try: if as_of: to_dt = _parse_start_datetime_util(as_of) if not to_dt: return {"error": "Invalid as_of time."} rates = _mt5_copy_rates_from(symbol, mt5_tf, to_dt, need) else: _tick = mt5.symbol_info_tick(symbol) if _tick is not None and getattr(_tick, 'time', None): t_utc = _mt5_epoch_to_utc(float(_tick.time)) server_now_dt = datetime.utcfromtimestamp(t_utc) else: server_now_dt = datetime.utcnow() rates = _mt5_copy_rates_from(symbol, mt5_tf, server_now_dt, need) finally: if _was_visible is False: try: mt5.symbol_select(symbol, False) except Exception: pass if rates is None or len(rates) < 3: return {"error": f"Failed to get sufficient rates for {symbol}: {mt5.last_error()}"} df = pd.DataFrame(rates) if as_of is None and len(df) >= 2: df = df.iloc[:-1] if len(df) < 3: return {"error": "Not enough closed bars"} # Normalize and apply denoise spec (uniform behavior) dn_spec_used = None if denoise is not None: try: dn_spec_used = _normalize_denoise_spec(denoise, default_when='pre_ti') except Exception: dn_spec_used = None if dn_spec_used: if method_l in {'parkinson','gk','rs','yang_zhang'} and not dn_spec_used.get('columns'): dn_spec_used['columns'] = ['open','high','low','close'] _apply_denoise(df, dn_spec_used, default_when='pre_ti') # Compute returns and helpers with np.errstate(divide='ignore', invalid='ignore'): r = np.diff(np.log(np.maximum(df['close'].astype(float).to_numpy(), 1e-12))) r = r[np.isfinite(r)] if r.size < 5: return {"error": "Insufficient returns to estimate volatility"} bpy = float(365.0*24.0*3600.0/float(tf_secs)) if method_l == 'ewma': lb = int(p.get('lookback', 1500)); halflife = p.get('halflife'); lam = p.get('lambda_', 0.94) tail = r[-lb:] if r.size >= lb else r if halflife is not None: try: lam = 1.0 - math.log(2.0) / float(halflife) except Exception: lam = 0.94 lam = float(lam) w = np.power(lam, np.arange(len(tail)-1, -1, -1, dtype=float)); w /= float(np.sum(w)) sigma2 = float(np.sum(w * (tail * tail))) sbar = math.sqrt(max(0.0, sigma2)) hsig = float(sbar * math.sqrt(max(1, int(horizon)))) return {"success": True, "symbol": symbol, "timeframe": timeframe, "method": method_l, "horizon": int(horizon), "sigma_bar_return": sbar, "sigma_annual_return": float(sbar*math.sqrt(bpy)), "horizon_sigma_return": hsig, "horizon_sigma_annual": float(hsig*math.sqrt(bpy/max(1,int(horizon)))), "params_used": {"lookback": lb, "lambda_": lam}, "denoise_used": dn_spec_used} if method_l in {'parkinson','gk','rs','yang_zhang','rolling_std'}: window = int(p.get('window', 20)) o = df['open'].astype(float).to_numpy(); h = df['high'].astype(float).to_numpy(); l = df['low'].astype(float).to_numpy(); c = df['close'].astype(float).to_numpy() if method_l == 'parkinson': v = _parkinson_sigma_sq(h, l) elif method_l == 'gk': v = _garman_klass_sigma_sq(o, h, l, c) elif method_l == 'rs': v = _rogers_satchell_sigma_sq(o, h, l, c) elif method_l == 'yang_zhang': with np.errstate(divide='ignore', invalid='ignore'): oc = np.log(np.maximum(o[1:],1e-12)) - np.log(np.maximum(c[:-1],1e-12)) co = np.log(np.maximum(c,1e-12)) - np.log(np.maximum(o,1e-12)) rs = (np.log(np.maximum(h,1e-12)) - np.log(np.maximum(c,1e-12))) * (np.log(np.maximum(h,1e-12)) - np.log(np.maximum(o,1e-12))) + \ (np.log(np.maximum(l,1e-12)) - np.log(np.maximum(c,1e-12))) * (np.log(np.maximum(l,1e-12)) - np.log(np.maximum(o,1e-12))) k = 0.34/(1.34 + (window+1)/(window-1)) if window>1 else 0.34 co_var = pd.Series(co[1:]).rolling(window=window, min_periods=window).var(ddof=0).to_numpy() oc_var = pd.Series(oc).rolling(window=window, min_periods=window).var(ddof=0).to_numpy() rs_mean = pd.Series(rs).rolling(window=window, min_periods=window).mean().to_numpy() v = (co_var + k*oc_var + (1-k)*rs_mean) else: v = pd.Series(r*r).rolling(window=window, min_periods=window).mean().to_numpy() sigma2 = float(v[-1]) if np.isfinite(v[-1]) else float(np.nanmean(v[-window:])) sbar = math.sqrt(max(0.0, sigma2)) hsig = float(sbar * math.sqrt(max(1, int(horizon)))) return {"success": True, "symbol": symbol, "timeframe": timeframe, "method": method_l, "horizon": int(horizon), "sigma_bar_return": sbar, "sigma_annual_return": float(sbar*math.sqrt(bpy)), "horizon_sigma_return": hsig, "horizon_sigma_annual": float(hsig*math.sqrt(bpy/max(1,int(horizon)))), "params_used": {"window": int(window)}, "denoise_used": dn_spec_used} if method_l == 'realized_kernel': window = int(p.get('window', 50)) kernel = str(p.get('kernel', 'tukey_hanning') or 'tukey_hanning') bandwidth = p.get('bandwidth') try: bandwidth_val = int(bandwidth) if bandwidth is not None else None except Exception: bandwidth_val = None tail = r[-window:] if r.size >= window else r rk_var = _realized_kernel_variance(tail, bandwidth=bandwidth_val, kernel=kernel) if not math.isfinite(rk_var) or rk_var < 0: return {"error": "Failed to compute realized kernel variance"} sigma_bar = math.sqrt(rk_var) sigma_h = math.sqrt(max(1, int(horizon)) * rk_var) return { "success": True, "symbol": symbol, "timeframe": timeframe, "method": method_l, "horizon": int(horizon), "sigma_bar_return": float(sigma_bar), "sigma_annual_return": float(sigma_bar * math.sqrt(bpy)), "horizon_sigma_return": float(sigma_h), "horizon_sigma_annual": float(sigma_h * math.sqrt(bpy / max(1, int(horizon)))), "params_used": {"window": int(window), "kernel": kernel, "bandwidth": bandwidth_val}, "denoise_used": dn_spec_used, } if method_l in garch_family: fit_bars = int(p.get('fit_bars', 2000)); mean_model = str(p.get('mean','Zero')).lower(); dist = str(p.get('dist','normal')) r_pct = 100.0 * r r_fit = r_pct[-fit_bars:] if r_pct.size > fit_bars else r_pct try: base_method = method_l.replace('_t', '') if method_l.endswith('_t'): dist = 'studentst' p_order = int(p.get('p', 1)) q_order = int(p.get('q', 1)) if base_method == 'egarch': am = _arch_model(r_fit, mean=mean_model if mean_model in ('zero','constant') else 'zero', vol='EGARCH', p=p_order, q=q_order, dist=dist) elif base_method == 'gjr_garch': o_order = int(p.get('o', 1)) am = _arch_model(r_fit, mean=mean_model if mean_model in ('zero','constant') else 'zero', vol='GARCH', p=p_order, o=o_order, q=q_order, dist=dist) elif base_method == 'figarch': am = _arch_model(r_fit, mean=mean_model if mean_model in ('zero','constant') else 'zero', vol='FIGARCH', p=p_order, q=q_order, dist=dist) else: am = _arch_model(r_fit, mean=mean_model if mean_model in ('zero','constant') else 'zero', vol='GARCH', p=p_order, q=q_order, dist=dist) res = am.fit(disp='off') fc = res.forecast(horizon=max(1, int(horizon)), reindex=False) variances = fc.variance.values[-1] sbar = float(math.sqrt(max(0.0, float(variances[0])))) / 100.0 hsig = float(math.sqrt(max(0.0, float(np.sum(variances))))) / 100.0 params_used = {k: p[k] for k in p} params_used.update({ "dist": dist, "mean": mean_model, "p": p_order, "q": q_order, }) if base_method == 'gjr_garch': params_used['o'] = int(p.get('o', 1)) return {"success": True, "symbol": symbol, "timeframe": timeframe, "method": method_l, "horizon": int(horizon), "sigma_bar_return": sbar, "sigma_annual_return": float(sbar*math.sqrt(bpy)), "horizon_sigma_return": hsig, "horizon_sigma_annual": float(hsig*math.sqrt(bpy/max(1,int(horizon)))), "params_used": params_used, "denoise_used": dn_spec_used} except Exception as ex: return {"error": f"{method_l} error: {ex}"} # Ensemble meta-method: aggregate multiple base forecasts if method_l == 'ensemble': try: # Determine default base methods based on availability default_methods = ['theta', 'fourier_ols'] if _SM_ETS_AVAILABLE: default_methods.append('holt') # ARIMA/SARIMA can be added by user explicitly to avoid latency by default base_methods_in = p.get('methods') if isinstance(base_methods_in, (list, tuple)) and base_methods_in: base_methods = [str(m).lower().strip() for m in base_methods_in] else: base_methods = list(default_methods) # Remove invalid or recursive entries base_methods = [m for m in base_methods if m in _FORECAST_METHODS and m != 'ensemble'] # Deduplicate while preserving order seen = set() base_methods = [m for m in base_methods if not (m in seen or seen.add(m))] if not base_methods: return {"error": "Ensemble requires at least one valid base method"} aggregator = str(p.get('aggregator', 'mean')).lower() weights = p.get('weights') expose_components = bool(p.get('expose_components', True)) # Normalize weights if provided w = None if isinstance(weights, (list, tuple)) and len(weights) == len(base_methods): try: w_arr = np.array([float(x) for x in weights], dtype=float) if np.all(np.isfinite(w_arr)) and np.any(w_arr > 0): w = w_arr.clip(min=0) s = float(np.sum(w)) if s > 0: w = w / s else: w = None else: w = None except Exception: w = None comp_results = [] for bm in base_methods: try: # Pass through common args; avoid per-method params for MVP simplicity r = forecast( symbol=symbol, timeframe=timeframe, method=bm, # type: ignore horizon=horizon, lookback=lookback, as_of=as_of, params=None, ci_alpha=ci_alpha, target=target, # type: ignore denoise=denoise, ) if isinstance(r, dict) and r.get('success') and r.get('forecast_price'): comp_results.append((bm, r)) except Exception: continue if not comp_results: return {"error": "Ensemble failed: no successful base forecasts"} # Establish reference horizon and timestamps from first successful component first_method, first_res = comp_results[0] ref_prices = np.array(first_res.get('forecast_price', []), dtype=float) fh = int(len(ref_prices)) if fh <= 0: return {"error": "Ensemble failed: empty forecast from base methods"} # Collect aligned component arrays; drop any mismatched lengths comps_prices = [] comps_returns = [] lower_list = [] upper_list = [] used_methods = [] for bm, r in comp_results: fp = r.get('forecast_price') if not isinstance(fp, (list, tuple)) or len(fp) != fh: continue used_methods.append(bm) comps_prices.append(np.array(fp, dtype=float)) fr = r.get('forecast_return') if isinstance(fr, (list, tuple)) and len(fr) == fh: comps_returns.append(np.array(fr, dtype=float)) lp = r.get('lower_price'); up = r.get('upper_price') if isinstance(lp, (list, tuple)) and isinstance(up, (list, tuple)) and len(lp) == fh and len(up) == fh: lower_list.append(np.array(lp, dtype=float)) upper_list.append(np.array(up, dtype=float)) if len(comps_prices) == 0: return {"error": "Ensemble failed: no aligned component forecasts"} M = len(comps_prices) # Choose weights if aggregator == 'weighted' and w is not None and len(w) == M: w_use = np.array(w, dtype=float) else: w_use = np.full(M, 1.0 / M, dtype=float) aggregator = 'mean' if aggregator == 'weighted' else aggregator X = np.vstack(comps_prices) # shape (M, fh) if aggregator == 'median': agg_price = np.median(X, axis=0) else: # mean or default agg_price = np.average(X, axis=0, weights=w_use) # Aggregate returns if all components provided them; otherwise skip if len(comps_returns) == M: XR = np.vstack(comps_returns) if aggregator == 'median': agg_return = np.median(XR, axis=0) else: agg_return = np.average(XR, axis=0, weights=w_use) else: agg_return = None # Aggregate confidence intervals only if all components have them if len(lower_list) == M and len(upper_list) == M: L = np.vstack(lower_list) U = np.vstack(upper_list) if aggregator == 'median': agg_lower = np.median(L, axis=0) agg_upper = np.median(U, axis=0) else: agg_lower = np.average(L, axis=0, weights=w_use) agg_upper = np.average(U, axis=0, weights=w_use) else: agg_lower = None agg_upper = None # Build payload using the first component as template for metadata/times payload: Dict[str, Any] = { "success": True, "symbol": first_res.get('symbol', symbol), "timeframe": first_res.get('timeframe', timeframe), "method": "ensemble", "target": first_res.get('target', str(target)), "params_used": { "base_methods": used_methods, "aggregator": aggregator, "weights": [float(x) for x in (w_use.tolist() if isinstance(w_use, np.ndarray) else [])], }, "lookback_used": int(first_res.get('lookback_used', 0)), "horizon": int(first_res.get('horizon', horizon)), "seasonality_period": int(first_res.get('seasonality_period', 0)), "as_of": first_res.get('as_of', as_of or None), "train_start": first_res.get('train_start'), "train_end": first_res.get('train_end'), "times": first_res.get('times'), "forecast_price": [float(v) for v in agg_price.tolist()], } # Timezone flag passthrough if present if 'timezone' in first_res: payload['timezone'] = first_res.get('timezone') # Trend: reuse first component's for simplicity if 'forecast_trend' in first_res: payload['forecast_trend'] = first_res.get('forecast_trend') if agg_return is not None: payload['forecast_return'] = [float(v) for v in agg_return.tolist()] if agg_lower is not None and agg_upper is not None and ci_alpha is not None: payload['lower_price'] = [float(v) for v in agg_lower.tolist()] payload['upper_price'] = [float(v) for v in agg_upper.tolist()] payload['ci_alpha'] = float(ci_alpha) if expose_components: comps_out = [] for i, (bm, r) in enumerate(comp_results): try: comps_out.append({ "method": bm, "weight": float(w_use[i]) if i < len(w_use) else float(1.0 / M), "forecast_price": r.get('forecast_price'), }) except Exception: continue payload['components'] = comps_out return payload except Exception as ex: return {"error": f"Error computing ensemble forecast: {ex}"} # Backward compatibility for 'lambda' -> 'lambda_' if 'lambda' in p and 'lambda_' not in p: p['lambda_'] = p['lambda'] # Determine bars required if method_l == 'ewma': lookback = int(p.get('lookback', 1500)) need = max(lookback + 2, 100) elif method_l in ('parkinson','gk','rs'): window = int(p.get('window', 20)) need = max(window + 2, 50) else: # garch fit_bars = int(p.get('fit_bars', 2000)) need = max(fit_bars + 2, 500) # Ensure symbol is ready; remember original visibility to restore later _info_before = mt5.symbol_info(symbol) _was_visible = bool(_info_before.visible) if _info_before is not None else None err = _ensure_symbol_ready(symbol) if err: return {"error": err} try: # Use explicit as-of time if provided, else server time for alignment if as_of: to_dt = _parse_start_datetime_util(as_of) if not to_dt: return {"error": "Invalid as_of_datetime. Try '2025-08-29', '2025-08-29 14:30', 'yesterday 14:00'."} rates = _mt5_copy_rates_from(symbol, mt5_tf, to_dt, need) else: _tick = mt5.symbol_info_tick(symbol) if _tick is not None and getattr(_tick, 'time', None): t_utc = _mt5_epoch_to_utc(float(_tick.time)) server_now_dt = datetime.utcfromtimestamp(t_utc) else: server_now_dt = datetime.utcnow() rates = _mt5_copy_rates_from(symbol, mt5_tf, server_now_dt, need) finally: if _was_visible is False: try: mt5.symbol_select(symbol, False) except Exception: pass if rates is None or len(rates) < 3: return {"error": f"Failed to get sufficient rates for {symbol}: {mt5.last_error()}"} df = pd.DataFrame(rates) # Drop forming last bar only when using current 'now' as anchor; keep all for historical as_of if as_of is None and len(df) >= 2: df = df.iloc[:-1] if len(df) < 3: return {"error": "Not enough closed bars to compute volatility"} # Optionally denoise relevant columns prior to volatility estimation __stage = 'denoise' if denoise: try: # Default columns by method if user didn't provide _spec = dict(denoise) if 'columns' not in _spec or not _spec.get('columns'): if method_l in ('ewma', 'garch'): _spec['columns'] = ['close'] else: # range-based estimators rely on OHLC _spec['columns'] = ['open', 'high', 'low', 'close'] _apply_denoise(df, _spec, default_when='pre_ti') except Exception: # Fail-safe: ignore denoise errors and proceed with raw data pass # Prefer denoised columns if user asked to keep originals def _col(name: str) -> str: dn = f"{name}_dn" return dn if dn in df.columns else name closes = df[_col('close')].to_numpy(dtype=float) highs = df[_col('high')].to_numpy(dtype=float) if 'high' in df.columns else None lows = df[_col('low')].to_numpy(dtype=float) if 'low' in df.columns else None opens = df[_col('open')].to_numpy(dtype=float) if 'open' in df.columns else None last_close = float(closes[-1]) bars_per_year = _bars_per_year(timeframe) ann_factor = math.sqrt(bars_per_year) if bars_per_year > 0 else float('nan') sigma_bar = float('nan') sigma_ann = float('nan') sigma_h_bar = float('nan') # horizon sigma of sum of returns over k bars params_used: Dict[str, Any] = {} if method_l == 'ewma': r = _log_returns_from_closes(closes) if r.size < 5: return {"error": "Not enough return observations for EWMA"} lam = p.get('lambda_') hl = p.get('halflife') if hl is not None: try: hl = float(hl) except Exception: hl = None if lam is not None: try: lam = float(lam) except Exception: lam = None if lam is not None and (hl is None): alpha = 1.0 - float(lam) var_series = pd.Series(r).ewm(alpha=alpha, adjust=False).var(bias=False) params_used['lambda'] = float(lam) else: # default halflife if not provided if hl is None: # heuristic per timeframe default_hl = 60 if timeframe.startswith('H') else (11 if timeframe == 'D1' else 180) hl = float(p.get('halflife', default_hl)) var_series = pd.Series(r).ewm(halflife=float(hl), adjust=False).var(bias=False) params_used['halflife'] = float(hl) v = float(var_series.iloc[-1]) v = v if math.isfinite(v) and v >= 0 else float('nan') sigma_bar = math.sqrt(v) if math.isfinite(v) and v >= 0 else float('nan') sigma_h_bar = math.sqrt(max(1, int(horizon)) * v) if math.isfinite(v) and v >= 0 else float('nan') elif method_l in ('parkinson','gk','rs'): if highs is None or lows is None: return {"error": "High/Low data required for range-based estimators"} window = int(p.get('window', 20)) params_used['window'] = window if method_l == 'parkinson': var_bars = _parkinson_sigma_sq(highs, lows) elif method_l == 'gk': if opens is None: return {"error": "Open data required for Garman–Klass"} var_bars = _garman_klass_sigma_sq(opens, highs, lows, closes) else: # rs if opens is None: return {"error": "Open data required for Rogers–Satchell"} var_bars = _rogers_satchell_sigma_sq(opens, highs, lows, closes) s = pd.Series(var_bars) v = float(s.tail(window).mean(skipna=True)) v = v if math.isfinite(v) and v >= 0 else float('nan') sigma_bar = math.sqrt(v) if math.isfinite(v) and v >= 0 else float('nan') sigma_h_bar = math.sqrt(max(1, int(horizon)) * v) if math.isfinite(v) and v >= 0 else float('nan') elif method_l == 'har_rv': # HAR-RV on daily realized variance computed from intraday returns try: rv_tf = str(p.get('rv_timeframe', 'M5')).upper() rv_mt5_tf = TIMEFRAME_MAP.get(rv_tf) if rv_mt5_tf is None: return {"error": f"Invalid rv_timeframe: {rv_tf}"} days = int(p.get('days', 120)) w = int(p.get('window_w', 5)) m = int(p.get('window_m', 22)) rv_tf_secs = TIMEFRAME_SECONDS.get(rv_tf, 300) bars_needed = int(days * max(1, (86400 // max(1, rv_tf_secs))) + 50) _info_before = mt5.symbol_info(symbol) _was_visible = bool(_info_before.visible) if _info_before is not None else None err = _ensure_symbol_ready(symbol) if err: return {"error": err} try: if as_of: to_dt = _parse_start_datetime_util(as_of) if not to_dt: return {"error": "Invalid as_of time."} rates_rv = _mt5_copy_rates_from(symbol, rv_mt5_tf, to_dt, bars_needed) else: _tick = mt5.symbol_info_tick(symbol) if _tick is not None and getattr(_tick, 'time', None): t_utc = _mt5_epoch_to_utc(float(_tick.time)) server_now_dt = datetime.utcfromtimestamp(t_utc) else: server_now_dt = datetime.utcnow() rates_rv = _mt5_copy_rates_from(symbol, rv_mt5_tf, server_now_dt, bars_needed) finally: if _was_visible is False: try: mt5.symbol_select(symbol, False) except Exception: pass if rates_rv is None or len(rates_rv) < 50: return {"error": f"Failed to get intraday rates for RV: {mt5.last_error()}"} dfrv = pd.DataFrame(rates_rv) if as_of is None and len(dfrv) >= 2: dfrv = dfrv.iloc[:-1] c = dfrv['close'].astype(float).to_numpy() if c.size < 10: return {"error": "Insufficient intraday bars for RV"} with np.errstate(divide='ignore', invalid='ignore'): rr = np.diff(np.log(np.maximum(c, 1e-12))) rr = rr[np.isfinite(rr)] dt = pd.to_datetime(dfrv['time'].iloc[1:].astype(float), unit='s', utc=True) days_idx = pd.DatetimeIndex(dt).floor('D') df_r = pd.DataFrame({'day': days_idx, 'r2': rr * rr}) daily_rv = df_r.groupby('day')['r2'].sum().astype(float) if len(daily_rv) < max(30, m + 5): return {"error": "Not enough daily RV observations for HAR-RV"} RV = daily_rv.to_numpy(dtype=float) N = RV.size # Lagged features Dlag = RV[:-1] def rmean(arr, k): s = pd.Series(arr) return s.rolling(window=k, min_periods=k).mean().to_numpy() Wlag_full = rmean(RV, w) # aligned to current index Mlag_full = rmean(RV, m) # Build design for y=RV[1:] y = RV[1:] Wlag = Wlag_full[:-1] Mlag = Mlag_full[:-1] Xd = Dlag mask = np.isfinite(Xd) & np.isfinite(Wlag) & np.isfinite(Mlag) & np.isfinite(y) X = np.vstack([np.ones_like(Xd[mask]), Xd[mask], Wlag[mask], Mlag[mask]]).T yv = y[mask] if X.shape[0] < 20: return {"error": "Insufficient samples after alignment for HAR-RV"} beta, *_ = np.linalg.lstsq(X, yv, rcond=None) # Next-day forecast from last lags D_last = RV[-1] W_last = float(pd.Series(RV).tail(w).mean()) M_last = float(pd.Series(RV).tail(m).mean()) rv_next = float(beta[0] + beta[1]*D_last + beta[2]*W_last + beta[3]*M_last) rv_next = max(0.0, rv_next) # Map to per-bar and horizon sigma for requested timeframe tf_secs = TIMEFRAME_SECONDS.get(timeframe) if not tf_secs: return {"error": f"Unsupported timeframe seconds for {timeframe}"} bars_per_day = float(86400.0 / float(tf_secs)) sbar = float(math.sqrt(rv_next / bars_per_day)) h_days = float(int(horizon)) / bars_per_day hsig = float(math.sqrt(rv_next * max(h_days, 0.0))) bpy = float(365.0 * 24.0 * 3600.0 / float(tf_secs)) return {"success": True, "symbol": symbol, "timeframe": timeframe, "method": method_l, "horizon": int(horizon), "sigma_bar_return": sbar, "sigma_annual_return": float(sbar*math.sqrt(bpy)), "horizon_sigma_return": hsig, "horizon_sigma_annual": float(hsig*math.sqrt(bpy/max(1,int(horizon)))), "params_used": {"rv_timeframe": rv_tf, "window_w": w, "window_m": m, "beta": [float(b) for b in beta.tolist()], "days": days}, "denoise_used": dn_spec_used} except Exception as ex: return {"error": f"HAR-RV error: {ex}"} else: # garch r = _log_returns_from_closes(closes) if r.size < 100: return {"error": "Not enough return observations for GARCH (need >=100)"} fit_bars = int(p.get('fit_bars', min(2000, r.size))) mean_model = str(p.get('mean', 'Zero')) dist = str(p.get('dist', 'normal')) params_used.update({'fit_bars': fit_bars, 'mean': mean_model, 'dist': dist}) r_fit = pd.Series(r[-fit_bars:]) * 100.0 # scale to percent try: am = _arch_model(r_fit, mean=mean_model.lower(), vol='GARCH', p=1, q=1, dist=dist) res = am.fit(disp='off') # Current conditional variance (percent^2) cond_vol = float(res.conditional_volatility.iloc[-1]) # percent sigma_bar = cond_vol / 100.0 # k-step ahead variance forecasts (percent^2) fc = res.forecast(horizon=max(1, int(horizon)), reindex=False) var_path = np.array(fc.variance.iloc[-1].values, dtype=float) # shape (horizon,) var_sum = float(np.nansum(var_path)) # percent^2 sigma_h_bar = math.sqrt(var_sum) / 100.0 except Exception as ex: return {"error": f"GARCH fitting error: {ex}"} sigma_ann = sigma_bar * ann_factor if math.isfinite(sigma_bar) and math.isfinite(ann_factor) else float('nan') sigma_h_ann = sigma_h_bar * ann_factor if math.isfinite(sigma_h_bar) and math.isfinite(ann_factor) else float('nan') return { "success": True, "symbol": symbol, "timeframe": timeframe, "method": method_l, "params_used": params_used, "bars_used": int(len(df)), "horizon": int(horizon), "last_close": last_close, "sigma_bar_return": sigma_bar, "sigma_annual_return": sigma_ann, "horizon_sigma_return": sigma_h_bar, "horizon_sigma_annual": sigma_h_ann, "as_of": as_of or None, "denoise_used": dn_spec_used, } except Exception as e: return {"error": f"Error computing volatility forecast: {str(e)}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/emerzon/mt-data-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server