Skip to main content
Glama
emerzon

MetaTrader5 MCP Server

by emerzon
forecast_engine.py28.6 kB
""" Forecast engine core logic and orchestration. """ from typing import Any, Dict, Optional, List, Literal, Tuple from datetime import datetime import numpy as np import pandas as pd import math import warnings import sys import os # Add the src directory to path for relative imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..')) from mtdata.core.constants import TIMEFRAME_MAP, TIMEFRAME_SECONDS from mtdata.utils.mt5 import _mt5_epoch_to_utc, _mt5_copy_rates_from, _ensure_symbol_ready from mtdata.utils.utils import ( _parse_start_datetime as _parse_start_datetime_util, _format_time_minimal as _format_time_minimal_util, _format_time_minimal_local as _format_time_minimal_local_util, _use_client_tz as _use_client_tz_util, ) from mtdata.utils.indicators import _parse_ti_specs as _parse_ti_specs_util, _apply_ta_indicators as _apply_ta_indicators_util from mtdata.utils.denoise import _apply_denoise, normalize_denoise_spec as _normalize_denoise_spec from mtdata.forecast.common import ( parse_kv_or_json as _parse_kv_or_json, fetch_history as _fetch_history, default_seasonality as _default_seasonality_period, next_times_from_last as _next_times_from_last, pd_freq_from_timeframe as _pd_freq_from_timeframe, ) from mtdata.forecast.target_builder import build_target_series from mtdata.forecast.registry import ForecastRegistry # Import all method modules to ensure registration import mtdata.forecast.methods.classical import mtdata.forecast.methods.ets_arima import mtdata.forecast.methods.statsforecast import mtdata.forecast.methods.mlforecast import mtdata.forecast.methods.pretrained import mtdata.forecast.methods.neural import mtdata.forecast.methods.sktime import mtdata.forecast.methods.analog import MetaTrader5 as mt5 _ENSEMBLE_BASE_METHODS = ( 'naive', 'drift', 'seasonal_naive', 'theta', 'fourier_ols', 'ses', 'holt', 'holt_winters_add', 'holt_winters_mul', 'arima', 'sarima', ) def _normalize_weights(weights: Any, size: int) -> Optional[np.ndarray]: if weights is None: return None vals: List[float] = [] if isinstance(weights, (list, tuple)): vals = [float(v) for v in list(weights)[:size]] elif isinstance(weights, str): parts = [p.strip() for p in weights.split(',') if p.strip()] vals = [float(p) for p in parts[:size]] else: return None if len(vals) != size: return None arr = np.asarray(vals, dtype=float) if not np.all(np.isfinite(arr)): return None arr = np.clip(arr, a_min=0.0, a_max=None) total = float(np.sum(arr)) if total <= 0: return None return arr / total def _ensemble_dispatch_method( method_name: str, series: pd.Series, horizon: int, seasonality: Optional[int], params: Optional[Dict[str, Any]], ) -> Optional[np.ndarray]: """Run a supported ensemble base method with safe fallbacks.""" m = str(method_name).lower().strip() # Allow any registered method in ensemble if it supports what we need # But for safety/speed, we might restrict to fast methods or check registry kwargs = dict(params or {}) try: forecaster = ForecastRegistry.get(m) res = forecaster.forecast(series, horizon, seasonality or 1, kwargs) return res.forecast except Exception: return None def _prepare_ensemble_cv( series: pd.Series, methods: List[str], horizon: int, seasonality: Optional[int], params_map: Dict[str, Dict[str, Any]], cv_points: int, min_train: int, ) -> Tuple[np.ndarray, np.ndarray]: """Collect walk-forward one-step predictions for ensemble weighting.""" n = len(series) if n <= max(min_train, horizon + 2): return np.empty((0, len(methods))), np.empty((0,)) end = n - horizon candidate_idx = list(range(max(min_train, 3), end)) if not candidate_idx: return np.empty((0, len(methods))), np.empty((0,)) if cv_points and len(candidate_idx) > cv_points: candidate_idx = candidate_idx[-cv_points:] rows: List[List[float]] = [] targets: List[float] = [] for idx in candidate_idx: train = series.iloc[:idx] if len(train) < min_train: continue row: List[float] = [] success = True for m in methods: fc = _ensemble_dispatch_method(m, train, horizon, seasonality, params_map.get(m, {})) if fc is None or fc.size == 0 or not math.isfinite(float(fc[0])): success = False break row.append(float(fc[0])) if not success: continue rows.append(row) targets.append(float(series.iloc[idx])) if not rows: return np.empty((0, len(methods))), np.empty((0,)) return np.asarray(rows, dtype=float), np.asarray(targets, dtype=float) # Local fallbacks for typing aliases used in signatures (avoid import cycle) try: from ..core.server import ForecastMethodLiteral, TimeframeLiteral, DenoiseSpec # type: ignore except Exception: # runtime fallback ForecastMethodLiteral = str TimeframeLiteral = str DenoiseSpec = Dict[str, Any] # Supported forecast methods - dynamically fetch from registry def _get_available_methods(): return tuple(ForecastRegistry.get_all_method_names()) _FORECAST_METHODS = _get_available_methods() def _calculate_lookback_bars(method_l: str, horizon: int, lookback: Optional[int], seasonality: int, timeframe: str) -> int: """Calculate the number of bars needed for forecasting.""" if lookback is not None and lookback > 0: return int(lookback) + 2 if method_l == 'analog': # Default search_depth=5000, window=64. # But we don't have params here. Assume reasonable default. # If the user provides params['search_depth'], we can't see it here easily without changing signature. # So we return a large enough default for fetch. # Actually AnalogMethod re-fetches, so this 'need' is just for the 'target_series' passed to it, # which it ignores (via Option A). # EXCEPT: the engine checks len(df) < 3. # So we just need something small like 100 to pass checks. return max(100, int(horizon) + 10) if method_l == 'seasonal_naive': return max(3 * seasonality, int(horizon) + seasonality + 2) elif method_l in ('theta', 'fourier_ols'): return max(300, int(horizon) + (2 * seasonality if seasonality else 50)) else: # naive, drift and others return max(100, int(horizon) + 10) def _prepare_base_data(df: pd.DataFrame, quantity: str, target: str) -> str: """Prepare base data column for forecasting.""" base_col = 'close' if quantity == 'return': df['__log_return'] = np.log(df['close'] / df['close'].shift(1)) base_col = '__log_return' elif quantity == 'volatility': if target == 'price': df['__log_return'] = np.log(df['close'] / df['close'].shift(1)) df['__squared_return'] = df['__log_return'] ** 2 base_col = '__squared_return' else: # return df['__squared_return'] = df['__log_return'] ** 2 base_col = '__squared_return' return base_col def _apply_features_and_target_spec(df: pd.DataFrame, features: Optional[Dict[str, Any]], target_spec: Optional[Dict[str, Any]], base_col: str) -> str: """Apply features and target specification to the dataframe.""" # Apply technical indicators if specified in features if features and isinstance(features, dict): ti_spec = features.get('ti') if ti_spec: ti_list = _parse_ti_specs_util(ti_spec) if ti_list: ti_cols = _apply_ta_indicators_util(df, ti_spec) # Update base_col if TI column is specified as target if target_spec and target_spec.get('column') in ti_cols: base_col = target_spec.get('column') # Apply target column transformations if target_spec: target_col = target_spec.get('column', base_col) transform = target_spec.get('transform') if transform == 'log': df[f'__target_{target_col}'] = np.log(df[target_col]) base_col = f'__target_{target_col}' elif transform == 'diff': df[f'__target_{target_col}'] = df[target_col].diff() base_col = f'__target_{target_col}' elif transform == 'pct': df[f'__target_{target_col}'] = df[target_col].pct_change() base_col = f'__target_{target_col}' elif target_col != base_col: base_col = target_col return base_col def _apply_dimensionality_reduction(X: pd.DataFrame, dimred_method: Optional[str], dimred_params: Optional[Dict[str, Any]]) -> pd.DataFrame: """Apply dimensionality reduction to feature matrix.""" if not dimred_method or len(X.columns) <= 1: return X try: from sklearn.decomposition import PCA from sklearn.manifold import TSNE from sklearn.feature_selection import SelectKBest, f_regression params = dimred_params or {} if dimred_method.lower() == 'pca': n_components = params.get('n_components', min(5, X.shape[1])) reducer = PCA(n_components=n_components) X_reduced = reducer.fit_transform(X) return pd.DataFrame(X_reduced, columns=[f'pca_{i}' for i in range(X_reduced.shape[1])]) elif dimred_method.lower() == 'tsne': n_components = params.get('n_components', 2) reducer = TSNE(n_components=n_components, random_state=42) X_reduced = reducer.fit_transform(X) return pd.DataFrame(X_reduced, columns=[f'tsne_{i}' for i in range(X_reduced.shape[1])]) elif dimred_method.lower() == 'selectkbest': k = params.get('k', min(5, X.shape[1])) selector = SelectKBest(score_func=f_regression, k=k) X_reduced = selector.fit_transform(X, y=None) # unsupervised selection return pd.DataFrame(X_reduced, columns=[f'select_{i}' for i in range(X_reduced.shape[1])]) except Exception: # Fall back to original features if dimensionality reduction fails pass return X def _format_forecast_output( forecast_values: np.ndarray, last_epoch: float, tf_secs: int, horizon: int, base_col: str, df: pd.DataFrame, ci_alpha: Optional[float], ci_values: Optional[np.ndarray], method: str, quantity: str, denoise_used: bool, metadata: Optional[Dict[str, Any]] = None, digits: Optional[int] = None, forecast_return_values: Optional[np.ndarray] = None, reconstructed_prices: Optional[np.ndarray] = None, ) -> Dict[str, Any]: """Format forecast output with proper structure.""" # Generate future time indices future_epochs = _next_times_from_last(last_epoch, tf_secs, horizon) # Time formatting _use_ctz = _use_client_tz_util() if _use_ctz: future_times = [_format_time_minimal_local_util(e) for e in future_epochs] else: future_times = [_format_time_minimal_util(e) for e in future_epochs] # Build base result result: Dict[str, Any] = { "success": True, "method": method, "horizon": horizon, "base_col": base_col, "forecast_time": future_times, "forecast_epoch": future_epochs, } # Choose which arrays to expose if quantity == 'return': if forecast_return_values is None: forecast_return_values = forecast_values result["forecast_return"] = [float(v) for v in forecast_return_values] if reconstructed_prices is not None: result["forecast_price"] = [float(v) for v in reconstructed_prices] else: result["forecast_price"] = [float(v) for v in forecast_values] if digits is not None: result["digits"] = int(digits) # Add confidence intervals if available if ci_alpha is not None and ci_values is not None: result["ci_alpha"] = float(ci_alpha) if len(ci_values) == 2: # [lower, upper] result["lower_price"] = [float(v) for v in ci_values[0]] result["upper_price"] = [float(v) for v in ci_values[1]] # Add metadata result.update({ "last_epoch": float(last_epoch), "quantity": quantity, "denoise_applied": denoise_used, }) if metadata: result.update(metadata) return result def forecast_engine( symbol: str, timeframe: TimeframeLiteral = "H1", method: ForecastMethodLiteral = "theta", horizon: int = 12, lookback: Optional[int] = None, as_of: Optional[str] = None, params: Optional[Dict[str, Any]] = None, ci_alpha: Optional[float] = 0.05, quantity: Literal['price','return','volatility'] = 'price', target: Literal['price','return'] = 'price', denoise: Optional[DenoiseSpec] = None, features: Optional[Dict[str, Any]] = None, dimred_method: Optional[str] = None, dimred_params: Optional[Dict[str, Any]] = None, target_spec: Optional[Dict[str, Any]] = None, exog_used: Optional[np.ndarray] = None, exog_future: Optional[np.ndarray] = None, prefetched_df: Optional[pd.DataFrame] = None, prefetched_base_col: Optional[str] = None, prefetched_denoise_spec: Optional[Any] = None, ) -> Dict[str, Any]: """Core forecast engine implementation. This is the main orchestration function that coordinates all forecasting operations. """ try: ci_values = None # Coerce CLI string inputs to proper types try: horizon = int(horizon) if horizon is not None else 12 except (ValueError, TypeError): horizon = 12 try: lookback = int(lookback) if lookback is not None else None except (ValueError, TypeError): lookback = None # Validation if timeframe not in TIMEFRAME_MAP: return {"error": f"Invalid timeframe: {timeframe}. Valid options: {list(TIMEFRAME_MAP.keys())}"} mt5_tf = TIMEFRAME_MAP[timeframe] tf_secs = TIMEFRAME_SECONDS.get(timeframe) if not tf_secs: return {"error": f"Unsupported timeframe seconds for {timeframe}"} method_l = str(method).lower().strip() quantity_l = str(quantity).lower().strip() # Refresh available methods available_methods = _get_available_methods() if method_l not in available_methods: return {"error": f"Invalid method: {method}. Valid options: {list(available_methods)}"} # Volatility models have a dedicated endpoint if quantity_l == 'volatility' or method_l.startswith('vol_'): return {"error": "Use forecast_volatility for volatility models"} # Parse method params p = _parse_kv_or_json(params) seasonality = int(p.get('seasonality')) if p.get('seasonality') is not None else _default_seasonality_period(timeframe) if method_l == 'seasonal_naive' and (not seasonality or seasonality <= 0): return {"error": "seasonal_naive requires a positive 'seasonality' in params or auto period"} # Calculate lookback bars need = _calculate_lookback_bars(method_l, horizon, lookback, seasonality, timeframe) # Fetch data (or reuse prefetched) and optional denoise if prefetched_df is not None: df = prefetched_df base_col = prefetched_base_col or ('close_dn' if 'close_dn' in df.columns else 'close') dn_spec_used = prefetched_denoise_spec else: try: df = _fetch_history(symbol, timeframe, int(need), as_of) except Exception as ex: return {"error": str(ex)} if len(df) < 3: return {"error": "Not enough closed bars to compute forecast"} # Apply denoising base_col = 'close' dn_spec_used = None if denoise: try: _dn = _normalize_denoise_spec(denoise, default_when='pre_ti') except Exception: _dn = None added = _apply_denoise(df, _dn, default_when='pre_ti') if _dn else [] dn_spec_used = _dn if len(added) > 0 and f"{base_col}_dn" in added: base_col = f"{base_col}_dn" # Track last close for potential price reconstruction try: last_close = float(df['close'].iloc[-1]) except Exception: last_close = float('nan') # Prepare base data base_col_initial = base_col base_col_prepared = _prepare_base_data(df, quantity_l, target) # Apply features and target specification base_col_prepared = _apply_features_and_target_spec(df, features, target_spec, base_col_prepared) # Prepare target series, honoring target_spec if provided target_series = df[base_col_prepared].dropna() target_info: Dict[str, Any] = {} if target_spec: try: y_arr, target_info = build_target_series(df, base_col_initial, target_spec, legacy_target=str(target)) target_series = pd.Series(y_arr, index=df.index) base_col = target_info.get('base', base_col_initial) except Exception as ex: return {"error": f"Invalid target_spec: {ex}"} else: base_col = base_col_prepared if quantity_l == 'return' or str(target).lower() == 'return': target_series = df[base_col].dropna() else: target_series = df[base_col] target_series = target_series.dropna() if len(target_series) < 3: return {"error": f"Not enough valid data points in column '{base_col}'"} # Prepare feature matrix if applicable (only if exog_used not provided) X = exog_used if X is None and isinstance(features, dict) and features.get('exog'): exog_cols = features['exog'] if isinstance(exog_cols, str): exog_cols = [c.strip() for c in exog_cols.split(',')] # Filter to available columns available_exog = [col for col in exog_cols if col in df.columns and col != base_col] if available_exog: X_df = df[available_exog].loc[target_series.index] # Apply dimensionality reduction if specified X_df = _apply_dimensionality_reduction(X_df, dimred_method, dimred_params) X = X_df.values # Get last timestamp and values last_epoch = float(df['time'].iloc[-1]) last_value = float(target_series.iloc[-1]) # Get symbol info for digits digits = None try: s_info = mt5.symbol_info(symbol) if s_info: digits = s_info.digits except Exception: pass # Call engine try: if method_l == 'ensemble': ensemble_meta = {} base_methods_in = p.get('methods') if isinstance(base_methods_in, str): base_methods = [m.strip().lower() for m in base_methods_in.split(',') if m.strip()] elif isinstance(base_methods_in, (list, tuple)): base_methods = [str(m).lower().strip() for m in base_methods_in if str(m).strip()] else: base_methods = ['naive', 'theta', 'fourier_ols'] # Filter to available methods avail = _get_available_methods() base_methods = [m for m in base_methods if m in avail and m != 'ensemble'] seen: set[str] = set() base_methods = [m for m in base_methods if not (m in seen or seen.add(m))] if not base_methods: base_methods = ['naive', 'theta'] params_in = p.get('method_params') if isinstance(p.get('method_params'), dict) else {} params_map = {str(k).lower(): (v if isinstance(v, dict) else {}) for k, v in params_in.items()} mode = str(p.get('mode', 'average')).lower() cv_points = int(p.get('cv_points', max(6, len(base_methods) * 2))) min_train = int(p.get('min_train_size', max(30, horizon * 3))) expose_components = bool(p.get('expose_components', True)) weights_vec = _normalize_weights(p.get('weights'), len(base_methods)) ensemble_meta = { 'mode_requested': mode, 'methods': list(base_methods), 'cv_points_requested': cv_points, } effective_mode = mode rmse = None ensemble_intercept = 0.0 coeffs = None cv_rows = 0 if mode in ('bma', 'stacking'): X_cv, y_cv = _prepare_ensemble_cv(target_series, base_methods, horizon, seasonality, params_map, cv_points, min_train) cv_rows = int(len(y_cv)) if X_cv.shape[0] >= max(3, len(base_methods)): if mode == 'bma': errors = X_cv - y_cv[:, None] rmse = np.sqrt(np.mean(np.square(errors), axis=0)) min_rmse = float(np.min(rmse)) weights_vec = np.exp(-0.5 * (rmse - min_rmse) / (min_rmse + 1e-12)) total = float(np.sum(weights_vec)) if total > 0: weights_vec = weights_vec / total else: weights_vec = None else: X_aug = np.column_stack([np.ones(X_cv.shape[0]), X_cv]) beta, *_ = np.linalg.lstsq(X_aug, y_cv, rcond=None) ensemble_intercept = float(beta[0]) coeffs = beta[1:] effective_mode = 'stacking' else: effective_mode = 'average' if effective_mode != 'stacking': if weights_vec is None: weights_vec = np.full(len(base_methods), 1.0 / max(1, len(base_methods))) else: total = float(np.sum(weights_vec)) weights_vec = (weights_vec / total) if total > 0 else np.full(len(base_methods), 1.0 / max(1, len(base_methods))) component_methods: List[str] = [] component_forecasts: List[np.ndarray] = [] for m in base_methods: fc = _ensemble_dispatch_method(m, target_series, horizon, seasonality, params_map.get(m, {})) if fc is None or fc.size == 0: continue component_methods.append(m) component_forecasts.append(fc) if not component_forecasts: return {'error': 'Ensemble failed: no component forecasts'} if len(component_methods) != len(base_methods): keep_idx = [base_methods.index(m) for m in component_methods] if effective_mode == 'stacking' and coeffs is not None: coeffs = coeffs[keep_idx] elif weights_vec is not None: weights_vec = weights_vec[keep_idx] base_methods = component_methods if effective_mode != 'stacking': total = float(np.sum(weights_vec)) if weights_vec is not None else 0.0 if weights_vec is None or total <= 0: weights_vec = np.full(len(base_methods), 1.0 / len(base_methods)) else: weights_vec = weights_vec / total combined = np.zeros_like(component_forecasts[0], dtype=float) for w, fc in zip(weights_vec, component_forecasts): combined = combined + float(w) * fc else: if coeffs is None or coeffs.size != len(base_methods): coeffs = np.full(len(base_methods), 1.0 / len(base_methods)) ensemble_intercept = 0.0 combined = np.full_like(component_forecasts[0], ensemble_intercept, dtype=float) for w, fc in zip(coeffs, component_forecasts): combined = combined + float(w) * fc weights_vec = coeffs forecast_values = combined ensemble_meta.update({ 'mode_used': effective_mode, 'methods': list(base_methods), 'cv_points_used': cv_rows, 'weights': [float(w) for w in (weights_vec.tolist() if isinstance(weights_vec, np.ndarray) else weights_vec)], }) if rmse is not None: ensemble_meta['cv_rmse'] = [float(v) for v in rmse.tolist()] if effective_mode == 'stacking': ensemble_meta['intercept'] = float(ensemble_intercept) if expose_components: ensemble_meta['components'] = {m: [float(v) for v in fc.tolist()] for m, fc in zip(base_methods, component_forecasts)} else: # Use Registry for all other methods forecaster = ForecastRegistry.get(method_l) # Prepare exog variables if supported and available # Note: X is the feature matrix for the training period. # For future exog, we would need to generate/fetch it. # Currently the engine doesn't support generating future exog automatically # unless provided in params or features. # But some methods (like ML) might use X (training exog) during training. # Pass X as exog_used if available kwargs = dict(p) kwargs['ci_alpha'] = ci_alpha kwargs['as_of'] = as_of if X is not None: kwargs['exog_used'] = X if exog_future is not None: kwargs['exog_future'] = exog_future # Call forecast res = forecaster.forecast(target_series, horizon, seasonality, kwargs) forecast_values = res.forecast ci_values = res.ci_values metadata = res.metadata or {} # Add params used to metadata metadata['params_used'] = res.params_used except Exception as e: return {"error": f"Forecast method '{method}' failed: {str(e)}"} if forecast_values is None: return {"error": f"Method '{method}' returned no forecast values"} # Prepare output arrays forecast_return_vals = None reconstructed_prices = None if quantity_l == 'return': forecast_return_vals = np.asarray(forecast_values, dtype=float) if np.isfinite(last_close): reconstructed_prices = last_close * np.exp(np.cumsum(forecast_return_vals)) # Format and return output denoise_used = dn_spec_used is not None result = _format_forecast_output( forecast_values, last_epoch, tf_secs, horizon, base_col, df, ci_alpha, ci_values, method, quantity_l, denoise_used, metadata, digits=digits, forecast_return_values=forecast_return_vals, reconstructed_prices=reconstructed_prices, ) if method_l == 'ensemble' and ensemble_meta: result['ensemble'] = ensemble_meta return result except Exception as e: return {"error": f"Forecast engine failed: {str(e)}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/emerzon/mt-data-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server