Skip to main content
Glama
emerzon

MetaTrader5 MCP Server

by emerzon
forecast.py35 kB
from typing import Any, Dict, Optional, List, Literal from datetime import datetime import os import json import math import numpy as np import pandas as pd import MetaTrader5 as mt5 import warnings # Adopt upcoming StatsForecast DataFrame format to avoid repeated warnings os.environ.setdefault("NIXTLA_ID_AS_COL", "1") from ..core.constants import TIMEFRAME_MAP, TIMEFRAME_SECONDS from ..utils.mt5 import _mt5_epoch_to_utc, _mt5_copy_rates_from, _ensure_symbol_ready from ..utils.utils import ( _parse_start_datetime as _parse_start_datetime_util, _format_time_minimal as _format_time_minimal_util, _format_time_minimal_local as _format_time_minimal_local_util, _use_client_tz as _use_client_tz_util, ) from ..utils.indicators import _parse_ti_specs as _parse_ti_specs_util, _apply_ta_indicators as _apply_ta_indicators_util from ..utils.denoise import _apply_denoise, normalize_denoise_spec as _normalize_denoise_spec from .common import ( parse_kv_or_json as _parse_kv_or_json, fetch_history as _fetch_history, ) # Removed invalid import: from .registry import get_forecast_methods_data from .helpers import ( default_seasonality_period as _default_seasonality_period, next_times_from_last as _next_times_from_last, pd_freq_from_timeframe as _pd_freq_from_timeframe, ) from .target_builder import build_target_series, aggregate_horizon_target from .forecast_methods import get_forecast_methods_data # Simple dimred factory used by the wrapper when building exogenous features. def _create_dimred_reducer(method: Any, params: Optional[Dict[str, Any]]) -> Any: try: from sklearn.decomposition import PCA from sklearn.manifold import TSNE from sklearn.feature_selection import SelectKBest, f_regression except Exception as ex: raise RuntimeError(f"dimred dependencies missing: {ex}") m = str(method).lower().strip() p = params or {} if m == 'pca': n_components = p.get('n_components', None) return PCA(n_components=n_components), {"n_components": n_components} if m == 'tsne': n_components = p.get('n_components', 2) return TSNE(n_components=n_components, random_state=42), {"n_components": n_components} if m == 'selectkbest': k = p.get('k', 5) selector = SelectKBest(score_func=f_regression, k=k) return selector, {"k": k} # Identity fallback to avoid crashes; caller already wraps in try/except. class _Identity: def fit_transform(self, X): return X return _Identity(), {"method": "identity"} # Removed unused imports of specific method implementations # Logic is now handled by forecast_engine via registry # Local fallbacks for typing aliases used in signatures (avoid import cycle) try: from ..core.server import ForecastMethodLiteral, TimeframeLiteral, DenoiseSpec # type: ignore except Exception: # runtime fallback ForecastMethodLiteral = str # type: ignore TimeframeLiteral = str # type: ignore DenoiseSpec = Dict[str, Any] # type: ignore # Optional availability flags and lazy imports following server logic # (Kept for backward compatibility if anything relies on these flags, though mostly unused now) try: from statsmodels.tsa.holtwinters import SimpleExpSmoothing as _SES, ExponentialSmoothing as _ETS # type: ignore _SM_ETS_AVAILABLE = True except Exception: _SM_ETS_AVAILABLE = False _SES = _ETS = None # type: ignore # ... (other availability checks can remain or be cleaned up, keeping for safety) ... def _default_seasonality_period(timeframe: str) -> int: from .common import default_seasonality return int(default_seasonality(timeframe)) def _next_times_from_last(last_epoch: float, tf_secs: int, horizon: int) -> List[float]: from .common import next_times_from_last return next_times_from_last(last_epoch, tf_secs, horizon) def _pd_freq_from_timeframe(tf: str) -> str: from .common import pd_freq_from_timeframe return pd_freq_from_timeframe(tf) def forecast( symbol: str, timeframe: TimeframeLiteral = "H1", method: ForecastMethodLiteral = "theta", horizon: int = 12, lookback: Optional[int] = None, as_of: Optional[str] = None, params: Optional[Dict[str, Any]] = None, ci_alpha: Optional[float] = 0.05, quantity: Literal['price','return','volatility'] = 'price', # type: ignore target: Literal['price','return'] = 'price', # deprecated in favor of quantity for modeling scale denoise: Optional[DenoiseSpec] = None, # Feature engineering for exogenous/multivariate models features: Optional[Dict[str, Any]] = None, # Optional dimensionality reduction across feature columns (overrides features.dimred_* if set) dimred_method: Optional[str] = None, dimred_params: Optional[Dict[str, Any]] = None, # Custom target specification (base column/alias, transform, and horizon aggregation) target_spec: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Fast forecasts for the next `horizon` bars using lightweight methods. Parameters: symbol, timeframe, method, horizon, lookback?, as_of?, params?, ci_alpha?, target, denoise? Methods: naive, seasonal_naive, drift, theta, fourier_ols, ses, holt, holt_winters_add, holt_winters_mul, arima, sarima. - `params`: method-specific settings; use `seasonality` inside params when needed (auto if omitted). - `target`: 'price' or 'return' (log-return). Price forecasts operate on close prices. - `ci_alpha`: confidence level (e.g., 0.05). Set to null to disable intervals. - `features`: Dict or "key=value" string for feature engineering. - `include`: List of columns to include (e.g., "open,high"). - `future_covariates`: List of date-based features to generate for future horizon. Supported tokens: `hour`, `dow` (day of week), `month`, `day`, `doy` (day of year), `week`, `minute`, `mod` (minute of day), `is_weekend`, `is_holiday`. For `is_holiday`, specify `country` in features (default: US). - `dimred_method`: Dimensionality reduction method (e.g., "pca"). """ try: if timeframe not in TIMEFRAME_MAP: return {"error": f"Invalid timeframe: {timeframe}. Valid options: {list(TIMEFRAME_MAP.keys())}"} mt5_tf = TIMEFRAME_MAP[timeframe] tf_secs = TIMEFRAME_SECONDS.get(timeframe) if not tf_secs: return {"error": f"Unsupported timeframe seconds for {timeframe}"} method_l = str(method).lower().strip() quantity_l = str(quantity).lower().strip() # Volatility models have a dedicated endpoint; keep forecast focused on price/return if quantity_l == 'volatility' or method_l.startswith('vol_'): return {"error": "Use forecast_volatility for volatility models"} # Parse method params via shared helper from .common import parse_kv_or_json as _parse_kv_or_json # local import to avoid cycles p = _parse_kv_or_json(params) # Prefer explicit seasonality inside params; otherwise auto by timeframe m = int(p.get('seasonality')) if p.get('seasonality') is not None else _default_seasonality_period(timeframe) if method_l == 'seasonal_naive' and (not m or m <= 0): return {"error": "seasonal_naive requires a positive 'seasonality' in params or auto period"} # Determine lookback bars to fetch (robust to string input) lb = None try: if lookback is not None: lb = int(lookback) # CLI may pass strings; coerce except Exception: lb = None if lb is not None and lb > 0: need = int(lb) + 2 else: if method_l == 'seasonal_naive': need = max(3 * m, int(horizon) + m + 2) elif method_l in ('theta', 'fourier_ols'): need = max(300, int(horizon) + (2 * m if m else 50)) else: # naive, drift and others need = max(100, int(horizon) + 10) # Fetch via shared helper (normalizes UTC time and drops live last bar) _info_before = mt5.symbol_info(symbol) try: df = _fetch_history(symbol, timeframe, int(need), as_of) except Exception as ex: return {"error": str(ex)} if len(df) < 3: return {"error": "Not enough closed bars to compute forecast"} # Optionally denoise base_col = 'close' dn_spec_used = None if denoise: try: _dn = _normalize_denoise_spec(denoise, default_when='pre_ti') except Exception: _dn = None added = _apply_denoise(df, _dn, default_when='pre_ti') if _dn else [] dn_spec_used = _dn if len(added) > 0 and f"{base_col}_dn" in added: base_col = f"{base_col}_dn" # Build target series: support custom target_spec or legacy target/quantity t = np.arange(1, len(df) + 1, dtype=float) last_time = float(df['time'].iloc[-1]) future_times = _next_times_from_last(last_time, int(tf_secs), int(horizon)) __stage = 'target_build' custom_target_mode = False target_info: Dict[str, Any] = {} # Helper to resolve alias base columns def _alias_base(arrs: Dict[str, np.ndarray], name: str) -> Optional[np.ndarray]: nm = name.strip().lower() if nm in ('typical','tp'): if all(k in arrs for k in ('high','low','close')): return (arrs['high'] + arrs['low'] + arrs['close']) / 3.0 return None if nm in ('hl2',): if all(k in arrs for k in ('high','low')): return (arrs['high'] + arrs['low']) / 2.0 return None if nm in ('ohlc4','ha_close','haclose'): if all(k in arrs for k in ('open','high','low','close')): return (arrs['open'] + arrs['high'] + arrs['low'] + arrs['close']) / 4.0 return None return None # Resolve base and transform from target_spec when provided if target_spec and isinstance(target_spec, dict): custom_target_mode = True ts = dict(target_spec) # Compute indicators if requested so 'base' can reference them ts_inds = ts.get('indicators') if ts_inds: try: specs = _parse_ti_specs_util(str(ts_inds)) if isinstance(ts_inds, str) else ts_inds _apply_ta_indicators_util(df, specs, default_when='pre_ti') except Exception: pass base_name = str(ts.get('base', base_col)) # Resolve base series if base_name in df.columns: y_base = df[base_name].astype(float).to_numpy() else: # Attempt alias arrs = {c: df[c].astype(float).to_numpy() for c in df.columns if c in ('open','high','low','close','volume')} y_alias = _alias_base(arrs, base_name) if y_alias is None: # Fallback to default base_col y_base = df[base_col].astype(float).to_numpy() else: y_base = np.asarray(y_alias, dtype=float) target_info['base'] = base_name # Transform transform = str(ts.get('transform', 'none')).lower() k_trans = int(ts.get('k', 1)) if ts.get('k') is not None else 1 if transform in ('return','log_return','diff','pct_change'): # general k-step transform if k_trans < 1: k_trans = 1 if transform == 'log_return': with np.errstate(divide='ignore', invalid='ignore'): y_shift = np.roll(np.log(np.maximum(y_base, 1e-12)), k_trans) series = np.log(np.maximum(y_base, 1e-12)) - y_shift elif transform == 'return' or transform == 'pct_change': with np.errstate(divide='ignore', invalid='ignore'): y_shift = np.roll(y_base, k_trans) series = (y_base - y_shift) / np.where(np.abs(y_shift) > 1e-12, y_shift, 1.0) if transform == 'pct_change': series = 100.0 * series else: # diff y_shift = np.roll(y_base, k_trans) series = y_base - y_shift # Drop first k rows for valid transform series = np.asarray(series[k_trans:], dtype=float) series = series[np.isfinite(series)] if series.size < 5: return {"error": "Not enough data for transformed target"} target_info['transform'] = transform target_info['k'] = k_trans else: series = np.asarray(y_base, dtype=float) series = series[np.isfinite(series)] if series.size < 3: return {"error": "Not enough data for target"} target_info['transform'] = 'none' # Since custom target can be any series, skip legacy price/return mapping use_returns = False origin_price = float('nan') else: # Legacy target behavior: price vs return on close y = df[base_col].astype(float).to_numpy() # Decide modeling scale for price/return use_returns = (quantity_l == 'return') or (str(target).lower() == 'return') if use_returns: with np.errstate(divide='ignore', invalid='ignore'): x = np.diff(np.log(np.maximum(y, 1e-12))) x = x[np.isfinite(x)] if x.size < 5: return {"error": "Not enough data to compute return-based forecast"} series = x origin_price = float(y[-1]) else: series = y origin_price = float(y[-1]) # Ensure finite numeric series for modeling series = np.asarray(series, dtype=float) series = series[np.isfinite(series)] n = len(series) if n < 3: return {"error": "Series too short for forecasting"} # ---- Optional feature engineering for exogenous models ---- exog_used: Optional[np.ndarray] = None exog_future: Optional[np.ndarray] = None feat_info: Dict[str, Any] = {} __stage = 'features_start' if features: try: # Accept dict, JSON string, or key=value pairs if isinstance(features, dict): fcfg = dict(features) elif isinstance(features, str): s = features.strip() if (s.startswith('{') and s.endswith('}')): try: fcfg = json.loads(s) except Exception: # Fallback: parse colon/equals pairs inside braces fcfg = {} toks = [tok for tok in s.strip().strip('{}').split() if tok] i = 0 while i < len(toks): tok = toks[i].strip().strip(',') if not tok: i += 1; continue if '=' in tok: k, v = tok.split('=', 1) fcfg[k.strip()] = v.strip().strip(',') i += 1; continue if tok.endswith(':'): key = tok[:-1].strip() val = '' if i + 1 < len(toks): val = toks[i+1].strip().strip(',') i += 2 else: i += 1 fcfg[key] = val continue i += 1 else: # Parse k=v or k: v pairs split on whitespace fcfg = {} toks = [tok for tok in s.split() if tok] i = 0 while i < len(toks): tok = toks[i].strip().strip(',') if '=' in tok: k, v = tok.split('=', 1) fcfg[k.strip()] = v.strip() i += 1; continue if tok.endswith(':'): key = tok[:-1].strip() val = '' if i + 1 < len(toks): val = toks[i+1].strip().strip(',') i += 2 else: i += 1 fcfg[key] = val continue i += 1 else: fcfg = {} include = fcfg.get('include', 'ohlcv') include_cols: list[str] = [] if isinstance(include, str): inc = include.strip().lower() if inc == 'ohlcv': for col in ('open','high','low','volume','tick_volume','real_volume'): if col in df.columns: include_cols.append(col) else: # comma/space separated list toks = [tok.strip() for tok in include.replace(',', ' ').split() if tok.strip()] for tok in toks: if tok in df.columns and tok not in ('time','close'): include_cols.append(tok) elif isinstance(include, (list, tuple)): for tok in include: s = str(tok).strip() if s in df.columns and s not in ('time','close'): include_cols.append(s) # Indicators (add columns) __stage = 'features_indicators' ind_specs = fcfg.get('indicators') if ind_specs: try: specs = _parse_ti_specs_util(str(ind_specs)) if isinstance(ind_specs, str) else ind_specs _apply_ta_indicators_util(df, specs, default_when='pre_ti') except Exception: pass # Add any newly created indicator columns (heuristic: non-time, non-OHLCV) __stage = 'features_collect' ti_cols = [] for c in df.columns: if c in ('time','open','high','low','close','volume','tick_volume','real_volume'): continue if df[c].dtype.kind in ('f','i'): ti_cols.append(c) # Calendar/future-known covariates (hour, dow, fourier:P) cal_cols: list[str] = [] cal_train: Optional[np.ndarray] = None cal_future: Optional[np.ndarray] = None fut_cov = fcfg.get('future_covariates') if fut_cov: tokens: list[str] = [] if isinstance(fut_cov, str): tokens = [tok.strip() for tok in fut_cov.replace(',', ' ').split() if tok.strip()] elif isinstance(fut_cov, (list, tuple)): tokens = [str(tok).strip() for tok in fut_cov] # Lazy DT index loading dt_train = None dt_future = None def _ensure_dt(): nonlocal dt_train, dt_future if dt_train is None: try: dt_train = pd.to_datetime(df['time'].astype(float).to_numpy(), unit='s', utc=True) except Exception: dt_train = pd.Index([]) if dt_future is None: try: t_future = np.asarray(future_times, dtype=float) dt_future = pd.to_datetime(t_future, unit='s', utc=True) except Exception: dt_future = pd.Index([]) tr_list: list[np.ndarray] = [] tf_list: list[np.ndarray] = [] for tok in tokens: tl = tok.lower() # Fourier terms if tl.startswith('fourier:'): try: per = int(tl.split(':',1)[1]) except Exception: per = 24 w = 2.0 * math.pi / float(max(1, per)) # No need for datetime index for fourier on index idx_tr = np.arange(len(df), dtype=float) idx_tf = np.arange(len(future_times), dtype=float) tr_list.append(np.sin(w * idx_tr)); cal_cols.append(f'fx_sin_{per}') tr_list.append(np.cos(w * idx_tr)); cal_cols.append(f'fx_cos_{per}') tf_list.append(np.sin(w * idx_tf)); tf_list.append(np.cos(w * idx_tf)); continue # Ensure DT indices are ready for date-based features _ensure_dt() if dt_train is None or dt_future is None: # Should not happen continue if tl in ('hour','hr'): vals_tr = dt_train.hour.to_numpy() vals_tf = dt_future.hour.to_numpy() w = 2.0 * math.pi / 24.0 tr_list.append(np.sin(w * vals_tr)); cal_cols.append('hr_sin') tr_list.append(np.cos(w * vals_tr)); cal_cols.append('hr_cos') tf_list.append(np.sin(w * vals_tf)); tf_list.append(np.cos(w * vals_tf)); elif tl in ('dow','wday','weekday'): vals_tr = dt_train.weekday.to_numpy() vals_tf = dt_future.weekday.to_numpy() w = 2.0 * math.pi / 7.0 tr_list.append(np.sin(w * vals_tr)); cal_cols.append('dow_sin') tr_list.append(np.cos(w * vals_tr)); cal_cols.append('dow_cos') tf_list.append(np.sin(w * vals_tf)); tf_list.append(np.cos(w * vals_tf)); elif tl in ('month', 'mo'): # Month 1-12 -> 0-11 for cyclic vals_tr = dt_train.month.to_numpy() - 1 vals_tf = dt_future.month.to_numpy() - 1 w = 2.0 * math.pi / 12.0 tr_list.append(np.sin(w * vals_tr)); cal_cols.append('mo_sin') tr_list.append(np.cos(w * vals_tr)); cal_cols.append('mo_cos') tf_list.append(np.sin(w * vals_tf)); tf_list.append(np.cos(w * vals_tf)); elif tl in ('day', 'dom'): # Day 1-31 -> 0-30 vals_tr = dt_train.day.to_numpy() - 1 vals_tf = dt_future.day.to_numpy() - 1 w = 2.0 * math.pi / 31.0 tr_list.append(np.sin(w * vals_tr)); cal_cols.append('dom_sin') tr_list.append(np.cos(w * vals_tr)); cal_cols.append('dom_cos') tf_list.append(np.sin(w * vals_tf)); tf_list.append(np.cos(w * vals_tf)); elif tl in ('doy', 'dayofyear'): # Day of year 1-366 -> 0-365 vals_tr = dt_train.dayofyear.to_numpy() - 1 vals_tf = dt_future.dayofyear.to_numpy() - 1 w = 2.0 * math.pi / 365.25 tr_list.append(np.sin(w * vals_tr)); cal_cols.append('doy_sin') tr_list.append(np.cos(w * vals_tr)); cal_cols.append('doy_cos') tf_list.append(np.sin(w * vals_tf)); tf_list.append(np.cos(w * vals_tf)); elif tl in ('week', 'woy'): # Week 1-53 -> 0-52. isocalendar().week returns UInt32, need cast vals_tr = dt_train.isocalendar().week.to_numpy().astype(float) - 1 vals_tf = dt_future.isocalendar().week.to_numpy().astype(float) - 1 w = 2.0 * math.pi / 52.143 # 365/7 tr_list.append(np.sin(w * vals_tr)); cal_cols.append('woy_sin') tr_list.append(np.cos(w * vals_tr)); cal_cols.append('woy_cos') tf_list.append(np.sin(w * vals_tf)); tf_list.append(np.cos(w * vals_tf)); elif tl in ('minute', 'min'): # Minute 0-59 vals_tr = dt_train.minute.to_numpy() vals_tf = dt_future.minute.to_numpy() w = 2.0 * math.pi / 60.0 tr_list.append(np.sin(w * vals_tr)); cal_cols.append('min_sin') tr_list.append(np.cos(w * vals_tr)); cal_cols.append('min_cos') tf_list.append(np.sin(w * vals_tf)); tf_list.append(np.cos(w * vals_tf)); elif tl in ('mod', 'minute_of_day'): # Minute of day 0-1439 vals_tr = dt_train.hour.to_numpy() * 60 + dt_train.minute.to_numpy() vals_tf = dt_future.hour.to_numpy() * 60 + dt_future.minute.to_numpy() w = 2.0 * math.pi / 1440.0 tr_list.append(np.sin(w * vals_tr)); cal_cols.append('mod_sin') tr_list.append(np.cos(w * vals_tr)); cal_cols.append('mod_cos') tf_list.append(np.sin(w * vals_tf)); tf_list.append(np.cos(w * vals_tf)); elif tl in ('is_weekend', 'weekend'): # 0 or 1 vals_tr = (dt_train.weekday >= 5).astype(float) vals_tf = (dt_future.weekday >= 5).astype(float) tr_list.append(vals_tr); cal_cols.append('is_weekend') tf_list.append(vals_tf); elif tl in ('is_holiday', 'holiday'): try: import holidays country = fcfg.get('country', 'US') # Gather years years_tr = dt_train.year.unique() years_tf = dt_future.year.unique() all_years = np.unique(np.concatenate([years_tr, years_tf])) # Init calendar hol_cal = holidays.CountryHoliday(country, years=all_years) # Map dates # is_holiday check on datetime/date objects # dt_train is DatetimeIndex vals_tr = np.array([1.0 if d in hol_cal else 0.0 for d in dt_train], dtype=float) vals_tf = np.array([1.0 if d in hol_cal else 0.0 for d in dt_future], dtype=float) tr_list.append(vals_tr); cal_cols.append('is_holiday') tf_list.append(vals_tf); except Exception: pass # Ignore if holidays lib missing or country invalid if tr_list: cal_train = np.vstack(tr_list).T.astype(float) cal_future = np.vstack(tf_list).T.astype(float) sel_cols = sorted(set(include_cols + ti_cols)) __stage = 'features_matrix' if sel_cols: X = df[sel_cols].astype(float).copy() # Fill missing values conservatively (ffill then bfill) X = X.replace([np.inf, -np.inf], np.nan) X = X.ffill().bfill() X_arr = X.to_numpy(dtype=float) # Dimensionality reduction across feature columns dr_method = (fcfg.get('dimred_method') or dimred_method) dr_params = fcfg.get('dimred_params') or dimred_params if dr_method and str(dr_method).lower() not in ('', 'none'): try: reducer, _ = _create_dimred_reducer(dr_method, dr_params) X_red = reducer.fit_transform(X_arr) exog = np.asarray(X_red, dtype=float) feat_info['dimred_method'] = str(dr_method) if isinstance(dr_params, dict): feat_info['dimred_params'] = dr_params elif dr_params is None: feat_info['dimred_params'] = {} else: feat_info['dimred_params'] = {"raw": str(dr_params)} feat_info['dimred_n_features'] = int(exog.shape[1]) except Exception as _ex: # Fallback to raw features on failure exog = X_arr feat_info['dimred_error'] = str(_ex) else: exog = X_arr # Save raw exog for future generation (before appending calendar features) exog_raw = exog # Append calendar features if cal_train is not None: exog = np.hstack([exog, cal_train]) if exog.size else cal_train # Align with return series if needed if (quantity_l == 'return') or (str(target).lower() == 'return'): exog = exog[1:] if exog_raw.size > 0: exog_raw = exog_raw[1:] # Build future exog by holding the last observed row of raw features + future calendar features exog_f = None if exog_raw.size > 0: last_row = exog_raw[-1] exog_f = np.tile(last_row.reshape(1, -1), (int(horizon), 1)) if exog_f is not None and cal_future is not None: exog_f = np.hstack([exog_f, cal_future]) elif exog_f is None and cal_future is not None: exog_f = cal_future exog_used = exog exog_future = exog_f feat_info['selected_columns'] = sel_cols + cal_cols feat_info['n_features'] = int(exog_used.shape[1]) if exog_used is not None else 0 else: feat_info['selected_columns'] = [] except Exception as _ex: feat_info['error'] = f"feature_build_error: {str(_ex)}" __stage = 'features_error' # Volatility branch: compute and return volatility metrics __stage = 'quantity_branch' if quantity_l == 'volatility': mt5_tf = TIMEFRAME_MAP[timeframe] # Use the dedicated volatility engine from .volatility import forecast_volatility return forecast_volatility( symbol=symbol, timeframe=timeframe, horizon=horizon, method=method, params=params, as_of=as_of ) # Use the unified forecast engine from .forecast_engine import forecast_engine # Map legacy arguments to engine arguments engine_params = params or {} # Inject context for methods that need it (like analog) engine_params['symbol'] = symbol engine_params['timeframe'] = timeframe # Call engine result = forecast_engine( symbol=symbol, timeframe=timeframe, method=method, horizon=horizon, lookback=lookback, as_of=as_of, params=engine_params, ci_alpha=ci_alpha, quantity=quantity, target=target, denoise=denoise, features=features, dimred_method=dimred_method, dimred_params=dimred_params, target_spec=target_spec, exog_used=exog_used, exog_future=exog_future, prefetched_df=df, prefetched_base_col=base_col, prefetched_denoise_spec=dn_spec_used, ) if "error" in result: return result # Add legacy fields if missing (for backward compatibility with CLI/API consumers) if "forecast" not in result: if "forecast_price" in result: result["forecast"] = result["forecast_price"] elif "forecast_return" in result: result["forecast"] = result["forecast_return"] return result except Exception as e: import traceback return {"error": f"Forecast failed: {str(e)}", "traceback": traceback.format_exc()}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/emerzon/mt-data-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server