Skip to main content
Glama
emerzon

MetaTrader5 MCP Server

by emerzon
forecast_preprocessing.py11.7 kB
""" Forecast data preprocessing and feature engineering. Handles data preparation, feature creation, and input validation. """ from typing import Any, Dict, Optional, List, Tuple import math import numpy as np import pandas as pd from ..core.constants import TIMEFRAME_SECONDS from ..utils.indicators import _parse_ti_specs as _parse_ti_specs_util, _apply_ta_indicators as _apply_ta_indicators_util from ..utils.denoise import _apply_denoise, normalize_denoise_spec as _normalize_denoise_spec from .common import parse_kv_or_json as _parse_kv_or_json def _default_seasonality_period(timeframe: str) -> int: """Infer a default seasonality for forecasting based on timeframe.""" try: sec = TIMEFRAME_SECONDS.get(timeframe) if not sec or sec <= 0: return 0 if sec < 86400: return int(round(86400.0 / float(sec))) if timeframe == 'D1': return 5 if timeframe == 'W1': return 52 if timeframe == 'MN1': return 12 return 0 except Exception: return 0 def _next_times_from_last(last_epoch: float, tf_secs: int, horizon: int) -> List[float]: """Generate future timestamps for forecasting.""" base = float(last_epoch) step = float(tf_secs) return [base + step * (i + 1) for i in range(int(horizon))] def _pd_freq_from_timeframe(tf: str) -> str: """Convert MT5 timeframe to pandas frequency string.""" tf = str(tf).upper().strip() if tf == 'M1': return '1min' if tf in ('M2', 'M3', 'M4', 'M5', 'M6', 'M10', 'M12', 'M15', 'M20', 'M30'): return f"{tf[1:]}min" if tf == 'H1': return '1H' if tf in ('H2', 'H3', 'H4', 'H6', 'H8', 'H12'): return f"{tf[1:]}H" if tf == 'D1': return '1D' if tf == 'W1': return '1W' if tf == 'MN1': return '1M' return '1H' # fallback def prepare_features( df: pd.DataFrame, features_cfg: Optional[Dict[str, Any]], future_times: List[float], n: int ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], List[str], List[str]]: """Prepare exogenous features for forecasting. Returns: - exog_used: Training features array - exog_future: Future features array - include_cols: Column names to include as features - ti_cols: Technical indicator column names """ if not features_cfg: return None, None, [], [] try: # Parse features configuration if isinstance(features_cfg, str): # Try JSON first, then k=v format s = features_cfg.strip() if s.startswith('{') and s.endswith('}'): try: import json fcfg = json.loads(s) except Exception: fcfg = _parse_kv_or_json(s) else: fcfg = _parse_kv_or_json(s) elif isinstance(features_cfg, dict): fcfg = dict(features_cfg) else: fcfg = {} # Process feature inclusion specification include_cols = _process_include_specification(df, fcfg) # Add technical indicators ti_cols = _add_technical_indicators(df, fcfg) # Add calendar/future covariates cal_train, cal_future, cal_cols = _add_calendar_features(df, fcfg, future_times) # Build final feature arrays exog_used, exog_future = _build_feature_arrays( df, include_cols, ti_cols, cal_train, cal_future, cal_cols, n ) return exog_used, exog_future, include_cols, ti_cols except Exception: return None, None, [], [] def _process_include_specification(df: pd.DataFrame, fcfg: Dict[str, Any]) -> List[str]: """Process the 'include' specification for feature columns.""" include = fcfg.get('include', 'ohlcv') include_cols: List[str] = [] if isinstance(include, str): inc = include.strip().lower() if inc == 'ohlcv': for col in ('open','high','low','volume','tick_volume','real_volume'): if col in df.columns: include_cols.append(col) else: # comma/space separated list toks = [tok.strip() for tok in include.replace(',', ' ').split() if tok.strip()] for tok in toks: if tok in df.columns and tok not in ('time','close'): include_cols.append(tok) elif isinstance(include, (list, tuple)): for tok in include: s = str(tok).strip() if s in df.columns and s not in ('time','close'): include_cols.append(s) return include_cols def _add_technical_indicators(df: pd.DataFrame, fcfg: Dict[str, Any]) -> List[str]: """Add technical indicators to the DataFrame.""" ind_specs = fcfg.get('indicators') if not ind_specs: return [] try: specs = _parse_ti_specs_util(str(ind_specs)) if isinstance(ind_specs, str) else ind_specs _apply_ta_indicators_util(df, specs, default_when='pre_ti') except Exception: pass # Collect newly created indicator columns ti_cols = [] for c in df.columns: if c in ('time','open','high','low','close','volume','tick_volume','real_volume'): continue if df[c].dtype.kind in ('f','i'): ti_cols.append(c) return ti_cols def _add_calendar_features( df: pd.DataFrame, fcfg: Dict[str, Any], future_times: List[float] ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], List[str]]: """Add calendar and future-known covariates.""" fut_cov = fcfg.get('future_covariates') if not fut_cov: return None, None, [] tokens: List[str] = [] if isinstance(fut_cov, str): tokens = [tok.strip() for tok in fut_cov.replace(',', ' ').split() if tok.strip()] elif isinstance(fut_cov, (list, tuple)): tokens = [str(tok).strip() for tok in fut_cov] if not tokens: return None, None, [] t_train = df['time'].astype(float).to_numpy() t_future = np.asarray(future_times, dtype=float) tr_list: List[np.ndarray] = [] tf_list: List[np.ndarray] = [] cal_cols: List[str] = [] for tok in tokens: tl = tok.lower() if tl.startswith('fourier:'): tr_f, tf_f, cols_f = _create_fourier_features(tl, t_train, t_future) tr_list.extend(tr_f) tf_list.extend(tf_f) cal_cols.extend(cols_f) elif tl in ('hour','hr'): tr_h, tf_h = _create_hour_features(t_train, t_future) if tr_h is not None: tr_list.append(tr_h) tf_list.append(tf_h) cal_cols.append('hour') elif tl in ('dow','dayofweek'): tr_d, tf_d = _create_dow_features(t_train, t_future) if tr_d is not None: tr_list.append(tr_d) tf_list.append(tf_d) cal_cols.append('dow') if not tr_list: return None, None, [] cal_train = np.column_stack(tr_list) if len(tr_list) > 1 else tr_list[0].reshape(-1, 1) cal_future = np.column_stack(tf_list) if len(tf_list) > 1 else tf_list[0].reshape(-1, 1) return cal_train, cal_future, cal_cols def _create_fourier_features( token: str, t_train: np.ndarray, t_future: np.ndarray ) -> Tuple[List[np.ndarray], List[np.ndarray], List[str]]: """Create Fourier features.""" try: per = int(token.split(':',1)[1]) except Exception: per = 24 w = 2.0 * math.pi / float(max(1, per)) idx_tr = np.arange(t_train.size, dtype=float) idx_tf = np.arange(t_future.size, dtype=float) tr_features = [np.sin(w * idx_tr), np.cos(w * idx_tr)] tf_features = [np.sin(w * idx_tf), np.cos(w * idx_tf)] col_names = [f'fx_sin_{per}', f'fx_cos_{per}'] return tr_features, tf_features, col_names def _create_hour_features( t_train: np.ndarray, t_future: np.ndarray ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: """Create hour-of-day features.""" try: hrs_tr = pd.to_datetime(t_train, unit='s', utc=True).hour.to_numpy() hrs_tf = pd.to_datetime(t_future, unit='s', utc=True).hour.to_numpy() return hrs_tr.astype(float), hrs_tf.astype(float) except Exception: return None, None def _create_dow_features( t_train: np.ndarray, t_future: np.ndarray ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: """Create day-of-week features.""" try: dow_tr = pd.to_datetime(t_train, unit='s', utc=True).dayofweek.to_numpy() dow_tf = pd.to_datetime(t_future, unit='s', utc=True).dayofweek.to_numpy() return dow_tr.astype(float), dow_tf.astype(float) except Exception: return None, None def _build_feature_arrays( df: pd.DataFrame, include_cols: List[str], ti_cols: List[str], cal_train: Optional[np.ndarray], cal_future: Optional[np.ndarray], cal_cols: List[str], n: int ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: """Build final exogenous feature arrays.""" # Combine all feature columns all_cols = include_cols + ti_cols if not all_cols and cal_train is None: return None, None arrays_tr: List[np.ndarray] = [] arrays_tf: List[np.ndarray] = [] # Add DataFrame columns if all_cols: for col in all_cols: if col in df.columns: arrays_tr.append(df[col].to_numpy().astype(float)) # For future, repeat the last value last_val = float(df[col].iloc[-1]) if len(df) > 0 else 0.0 arrays_tf.append(np.full(n, last_val, dtype=float)) # Add calendar features if cal_train is not None and cal_future is not None: if cal_train.ndim == 1: arrays_tr.append(cal_train) arrays_tf.append(cal_future) else: for i in range(cal_train.shape[1]): arrays_tr.append(cal_train[:, i]) arrays_tf.append(cal_future[:, i]) if not arrays_tr: return None, None exog_used = np.column_stack(arrays_tr) if len(arrays_tr) > 1 else arrays_tr[0].reshape(-1, 1) exog_future = np.column_stack(arrays_tf) if len(arrays_tf) > 1 else arrays_tf[0].reshape(-1, 1) return exog_used, exog_future def apply_preprocessing( df: pd.DataFrame, quantity: str, target: str, denoise: Optional[Dict[str, Any]] ) -> str: """Apply preprocessing transformations to prepare data for forecasting. Returns the name of the column to use for forecasting. """ # Apply denoising if specified if denoise: try: denoise_spec = _normalize_denoise_spec(denoise) _apply_denoise(df, denoise_spec, default_when='pre_ti') except Exception: pass # Determine base column based on quantity and target if quantity == 'volatility': base_col = 'close' # Will compute volatility from returns elif quantity == 'return': base_col = 'close' # Will compute returns else: # price if target == 'return': base_col = 'close' # Will compute returns from price else: base_col = 'close' # Direct price forecasting return base_col __all__ = [ '_default_seasonality_period', '_next_times_from_last', '_pd_freq_from_timeframe', 'prepare_features', 'apply_preprocessing' ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/emerzon/mt-data-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server