Skip to main content
Glama
emerzon

MetaTrader5 MCP Server

by emerzon
barriers.py33 kB
from typing import Any, Dict, Optional, List, Literal, Tuple, Set, Union import numpy as np import MetaTrader5 as mt5 from ..core.schema import TimeframeLiteral, DenoiseSpec from ..core.constants import TIMEFRAME_SECONDS from .common import fetch_history as _fetch_history, parse_kv_or_json as _parse_kv_or_json from .monte_carlo import ( simulate_gbm_mc as _simulate_gbm_mc, simulate_hmm_mc as _simulate_hmm_mc, simulate_garch_mc as _simulate_garch_mc, simulate_bootstrap_mc as _simulate_bootstrap_mc, gbm_single_barrier_upcross_prob as _gbm_upcross_prob ) BARRIER_GRID_PRESETS = { 'scalp': { 'tp_min': 0.08, 'tp_max': 0.60, 'tp_steps': 7, 'sl_min': 0.20, 'sl_max': 1.20, 'sl_steps': 7, }, 'intraday': { 'tp_min': 0.25, 'tp_max': 1.50, 'tp_steps': 7, 'sl_min': 0.25, 'sl_max': 2.50, 'sl_steps': 9, }, 'swing': { 'tp_min': 0.60, 'tp_max': 3.50, 'tp_steps': 7, 'sl_min': 0.50, 'sl_max': 4.50, 'sl_steps': 8, }, 'position': { 'tp_min': 1.00, 'tp_max': 8.00, 'tp_steps': 8, 'sl_min': 0.75, 'sl_max': 6.00, 'sl_steps': 8, }, } def forecast_barrier_hit_probabilities( symbol: str, timeframe: TimeframeLiteral = "H1", horizon: int = 12, method: Literal['mc_gbm','hmm_mc','garch','bootstrap'] = 'hmm_mc', direction: Literal['long','short'] = 'long', tp_abs: Optional[float] = None, sl_abs: Optional[float] = None, tp_pct: Optional[float] = None, sl_pct: Optional[float] = None, tp_pips: Optional[float] = None, sl_pips: Optional[float] = None, params: Optional[Dict[str, Any]] = None, denoise: Optional[DenoiseSpec] = None, ) -> Dict[str, Any]: """Monte Carlo barrier analysis: probability of reaching TP/SL within horizon.""" try: if timeframe not in TIMEFRAME_SECONDS: return {"error": f"Invalid timeframe: {timeframe}"} p = _parse_kv_or_json(params) # Fetch enough history for calibration need = int(max(300, horizon + 100)) df = _fetch_history(symbol, timeframe, need, as_of=None) if len(df) < 10: return {"error": "Insufficient history for simulation"} # Current price baseline last_price = float(df['close'].astype(float).iloc[-1]) # Resolve pip size (approximate): use 10*point for 5/3-digit FX, else 1*point pip_size = None try: info = mt5.symbol_info(symbol) if info is not None: digits = int(getattr(info, 'digits', 0) or 0) point = float(getattr(info, 'point', 0.0) or 0.0) pip_size = float(point * (10.0 if digits in (3, 5) else 1.0)) if point > 0 else None except Exception: pip_size = None def _coerce_float(v: Any) -> Optional[float]: try: if v is None: return None return float(str(v)) except Exception: return None # Compute absolute TP/SL prices with explicit trade direction dir_long = str(direction).lower() == 'long' tp_price = _coerce_float(tp_abs) sl_price = _coerce_float(sl_abs) r_tp = _coerce_float(tp_pct) r_sl = _coerce_float(sl_pct) pp_tp = _coerce_float(tp_pips) pp_sl = _coerce_float(sl_pips) if tp_price is None or sl_price is None: # Derive from pct/pips if absolutes not provided if dir_long: if tp_price is None: if r_tp is not None: tp_price = last_price * (1.0 + (r_tp / 100.0)) elif pp_tp is not None and pip_size is not None: tp_price = last_price + pp_tp * pip_size if sl_price is None: if r_sl is not None: sl_price = last_price * (1.0 - (r_sl / 100.0)) elif pp_sl is not None and pip_size is not None: sl_price = last_price - pp_sl * pip_size else: # short if tp_price is None: if r_tp is not None: tp_price = last_price * (1.0 - (r_tp / 100.0)) elif pp_tp is not None and pip_size is not None: tp_price = last_price - pp_tp * pip_size if sl_price is None: if r_sl is not None: sl_price = last_price * (1.0 + (r_sl / 100.0)) elif pp_sl is not None and pip_size is not None: sl_price = last_price + pp_sl * pip_size if tp_price is None or sl_price is None: return {"error": "Provide barriers via tp_abs/sl_abs or tp_pct/sl_pct or tp_pips/sl_pips"} # Ensure correct side relative to direction (adjust minimally if inverted) if dir_long: if tp_price <= last_price: tp_price = last_price * 1.000001 if sl_price >= last_price: sl_price = last_price * 0.999999 else: if tp_price >= last_price: tp_price = last_price * 0.999999 if sl_price <= last_price: sl_price = last_price * 1.000001 # Build input series (denoise optional) base_col = 'close' if denoise: try: from ..utils.denoise import _apply_denoise as _apply_denoise_util added = _apply_denoise_util(df, denoise, default_when='pre_ti') if f"{base_col}_dn" in added: base_col = f"{base_col}_dn" except Exception: pass prices = df[base_col].astype(float).to_numpy() # Simulate paths sims = int(p.get('n_sims', p.get('sims', 2000)) or 2000) seed = int(p.get('seed', 42) or 42) method_key = str(method).lower() if method_key == 'mc_gbm': sim = _simulate_gbm_mc(prices, horizon=int(horizon), n_sims=int(sims), seed=int(seed)) elif method_key == 'hmm_mc': n_states = int(p.get('n_states', 2) or 2) sim = _simulate_hmm_mc(prices, horizon=int(horizon), n_states=int(n_states), n_sims=int(sims), seed=int(seed)) elif method_key == 'garch': p_order = int(p.get('p', 1)) q_order = int(p.get('q', 1)) sim = _simulate_garch_mc(prices, horizon=int(horizon), n_sims=int(sims), seed=int(seed), p_order=p_order, q_order=q_order) elif method_key == 'bootstrap': bs = p.get('block_size') if bs: bs = int(bs) sim = _simulate_bootstrap_mc(prices, horizon=int(horizon), n_sims=int(sims), seed=int(seed), block_size=bs) else: return {"error": f"Unsupported method: {method}. Use 'mc_gbm', 'hmm_mc', 'garch', or 'bootstrap'"} price_paths = np.asarray(sim['price_paths'], dtype=float) S, H = price_paths.shape # Vectorized hit detection # hits_tp/sl: boolean (S, H) if dir_long: hits_tp = (price_paths >= tp_price) hits_sl = (price_paths <= sl_price) else: hits_tp = (price_paths <= tp_price) hits_sl = (price_paths >= sl_price) # Find first hit index (argmax returns 0 if none found, check any) idx_tp = np.argmax(hits_tp, axis=1) idx_sl = np.argmax(hits_sl, axis=1) any_tp = np.any(hits_tp, axis=1) any_sl = np.any(hits_sl, axis=1) # Set index to H (beyond horizon) if no hit idx_tp_val = np.where(any_tp, idx_tp, H) idx_sl_val = np.where(any_sl, idx_sl, H) # Determine outcomes # TP Win: TP hit detected AND (SL not hit OR TP hit before SL) tp_wins = (idx_tp_val < idx_sl_val) # SL Win: SL hit detected AND (TP not hit OR SL hit before TP) sl_wins = (idx_sl_val < idx_tp_val) # Tie: Both hit at same index (rare but possible in discrete time) ties = (idx_tp_val == idx_sl_val) & (idx_tp_val < H) # No hit: Both H no_hits = (idx_tp_val == H) & (idx_sl_val == H) tp_first = np.sum(tp_wins) sl_first = np.sum(sl_wins) both_tie = np.sum(ties) no_hit = np.sum(no_hits) # Collect hit times (1-based) for stats # TP stats include strict wins and ties t_hit_tp = (idx_tp_val[tp_wins | ties] + 1).tolist() t_hit_sl = (idx_sl_val[sl_wins | ties] + 1).tolist() # Cumulative hit curves (hit at or before t) def _compute_cum_curve(indices, valid_mask, length): valid_indices = indices[valid_mask] if valid_indices.size == 0: return np.zeros(length, dtype=float) # bincount counts occurrences of each index counts = np.bincount(valid_indices, minlength=length) if counts.size > length: counts = counts[:length] return np.cumsum(counts).astype(float) tp_any_by_t = _compute_cum_curve(idx_tp, any_tp, H) sl_any_by_t = _compute_cum_curve(idx_sl, any_sl, H) S_f = float(S) prob_tp_first = (tp_first + 0.5 * both_tie) / S_f prob_sl_first = (sl_first + 0.5 * both_tie) / S_f prob_no_hit = no_hit / S_f tp_any_curve = (tp_any_by_t / S_f).tolist() sl_any_curve = (sl_any_by_t / S_f).tolist() def _stats(arr: list[int]) -> Dict[str, float]: if not arr: return {"mean": float('nan'), "median": float('nan')} a = np.asarray(arr, dtype=float) return {"mean": float(a.mean()), "median": float(np.median(a))} tf_secs = TIMEFRAME_SECONDS.get(timeframe, 0) tp_stats = _stats(t_hit_tp) sl_stats = _stats(t_hit_sl) def _finite_or_none(x: float) -> Optional[float]: try: return float(x) if np.isfinite(x) else None except Exception: return None # Directional interpretation: # - For long: TP is above last_price, SL is below; prob_tp_first is long win probability. # - For short: TP is below last_price, SL is above; prob_tp_first is short win probability. edge = float(prob_tp_first - prob_sl_first) out = { "success": True, "symbol": symbol, "timeframe": timeframe, "method": method, "horizon": int(horizon), "direction": direction, "last_price": last_price, "tp_price": float(tp_price), "sl_price": float(sl_price), "prob_tp_first": float(prob_tp_first), "prob_sl_first": float(prob_sl_first), "prob_no_hit": float(prob_no_hit), "edge": float(edge), "tp_hit_prob_by_t": [float(v) for v in tp_any_curve], "sl_hit_prob_by_t": [float(v) for v in sl_any_curve], "time_to_tp_bars": tp_stats, "time_to_sl_bars": sl_stats, "time_to_tp_seconds": {k: _finite_or_none(v * tf_secs) for k, v in tp_stats.items()}, "time_to_sl_seconds": {k: _finite_or_none(v * tf_secs) for k, v in sl_stats.items()}, "params_used": {k: p[k] for k in p if k in {"n_sims", "seed", "n_states", "p", "q", "block_size"}}, } if 'model_summary' in sim: out['model_summary'] = str(sim['model_summary']) return out except Exception as e: return {"error": f"Error computing barrier probabilities: {str(e)}"} def forecast_barrier_closed_form( symbol: str, timeframe: TimeframeLiteral = "H1", horizon: int = 12, direction: Literal['up','down'] = 'up', barrier: float = 0.0, mu: Optional[float] = None, sigma: Optional[float] = None, denoise: Optional[DenoiseSpec] = None, ) -> Dict[str, Any]: """Closed-form single-barrier hit probability for GBM within horizon.""" try: need = int(max(400, horizon + 100)) df = _fetch_history(symbol, timeframe, need, as_of=None) if len(df) < 10: return {"error": "Insufficient history"} base_col = 'close' if denoise: try: from ..utils.denoise import _apply_denoise as _apply_denoise_util added = _apply_denoise_util(df, denoise, default_when='pre_ti') if f"{base_col}_dn" in added: base_col = f"{base_col}_dn" except Exception: pass prices = np.asarray(df[base_col].astype(float).to_numpy(), dtype=float) prices = prices[np.isfinite(prices)] if prices.size < 5: return {"error": "Insufficient prices"} s0 = float(prices[-1]) if barrier <= 0: return {"error": "Provide a positive barrier price"} tf_secs = TIMEFRAME_SECONDS.get(timeframe, 0) if not tf_secs: return {"error": f"Unsupported timeframe seconds for {timeframe}"} T = float(tf_secs * int(horizon)) / (365.0 * 24.0 * 3600.0) if mu is None or sigma is None: with np.errstate(divide='ignore', invalid='ignore'): r = np.diff(np.log(np.maximum(prices, 1e-12))) r = r[np.isfinite(r)] if r.size < 5: return {"error": "Insufficient returns for calibration"} mu_hat = float(np.mean(r)) * (365.0 * 24.0 * 3600.0 / tf_secs) sigma_hat = float(np.std(r, ddof=1)) * (365.0 * 24.0 * 3600.0 / tf_secs) ** 0.5 if mu is None: mu = mu_hat if sigma is None: sigma = sigma_hat log_drift = float(mu) sigma_val = float(sigma) if sigma_val <= 0: return {"error": "Sigma must be positive"} sigma_sq = sigma_val * sigma_val gbm_drift = log_drift + 0.5 * sigma_sq dir_lower = str(direction).lower() if dir_lower == 'down': s0_inv = 1.0 / s0 b_inv = 1.0 / float(barrier) inv_drift = sigma_sq - gbm_drift prob = _gbm_upcross_prob(s0_inv, b_inv, float(inv_drift), sigma_val, float(T)) else: prob = _gbm_upcross_prob(s0, float(barrier), float(gbm_drift), sigma_val, float(T)) return { "success": True, "symbol": symbol, "timeframe": timeframe, "horizon": int(horizon), "direction": direction, "last_price": s0, "barrier": float(barrier), "mu_annual": float(gbm_drift), "log_drift_annual": float(log_drift), "sigma_annual": sigma_val, "prob_hit": float(prob), } except Exception as e: return {"error": f"Error computing closed-form barrier probability: {str(e)}"} def forecast_barrier_optimize( symbol: str, timeframe: TimeframeLiteral = "H1", horizon: int = 12, method: Literal['mc_gbm','hmm_mc','garch','bootstrap'] = 'hmm_mc', direction: Literal['long','short'] = 'long', mode: Literal['pct','pips'] = 'pct', tp_min: float = 0.25, tp_max: float = 1.5, tp_steps: int = 7, sl_min: float = 0.25, sl_max: float = 2.5, sl_steps: int = 9, params: Optional[Dict[str, Any]] = None, denoise: Optional[DenoiseSpec] = None, objective: Literal['edge','prob_tp_first','kelly','ev','ev_uncond','kelly_uncond'] = 'edge', return_grid: bool = True, top_k: Optional[int] = None, output: Literal['full','summary'] = 'full', grid_style: Literal['fixed','volatility','ratio','preset'] = 'fixed', preset: Optional[str] = None, vol_window: int = 250, vol_min_mult: float = 0.5, vol_max_mult: float = 4.0, vol_steps: int = 7, vol_sl_extra: float = 1.8, vol_floor_pct: float = 0.15, vol_floor_pips: float = 8.0, ratio_min: float = 0.5, ratio_max: float = 4.0, ratio_steps: int = 8, refine: bool = False, refine_radius: float = 0.3, refine_steps: int = 5, ) -> Dict[str, Any]: """Optimize TP/SL barriers with support for presets, volatility scaling, ratios, and two-stage refinement.""" try: if timeframe not in TIMEFRAME_SECONDS: return {"error": f"Invalid timeframe: {timeframe}"} params_dict = _parse_kv_or_json(params) mode_val = str(mode).lower() objective_val = str(objective).lower() valid_objectives = {'edge', 'prob_tp_first', 'kelly', 'ev', 'kelly_uncond', 'ev_uncond'} if objective_val not in valid_objectives: objective_val = 'edge' grid_style_val = str(params_dict.get('grid_style', grid_style)).lower() if grid_style_val not in {'fixed', 'volatility', 'ratio', 'preset'}: grid_style_val = 'fixed' preset_candidate = params_dict.get('grid_preset', params_dict.get('preset', preset)) preset_val = str(preset_candidate).lower() if isinstance(preset_candidate, str) and preset_candidate else None refine_flag = bool(params_dict.get('refine', refine)) refine_radius_val = max(0.0, float(params_dict.get('refine_radius', refine_radius))) refine_steps_val = max(2, int(params_dict.get('refine_steps', refine_steps))) ratio_min_val = float(params_dict.get('ratio_min', ratio_min)) ratio_max_val = float(params_dict.get('ratio_max', ratio_max)) ratio_steps_val = max(2, int(params_dict.get('ratio_steps', ratio_steps))) if ratio_min_val <= 0: ratio_min_val = ratio_min if ratio_max_val < ratio_min_val: ratio_max_val = ratio_min_val vol_window_val = int(params_dict.get('vol_window', vol_window)) vol_min_mult_val = float(params_dict.get('vol_min_mult', vol_min_mult)) vol_max_mult_val = float(params_dict.get('vol_max_mult', vol_max_mult)) vol_steps_val = max(2, int(params_dict.get('vol_steps', vol_steps))) vol_sl_extra_val = float(params_dict.get('vol_sl_extra', vol_sl_extra)) vol_sl_multiplier_val = float(params_dict.get('vol_sl_multiplier', vol_sl_extra_val)) vol_sl_steps_val = max(vol_steps_val, int(params_dict.get('vol_sl_steps', vol_steps_val + 2))) vol_floor_pct_val = float(params_dict.get('vol_floor_pct', vol_floor_pct)) vol_floor_pips_val = float(params_dict.get('vol_floor_pips', vol_floor_pips)) # Optional risk/reward filter applied across all grid styles rr_min_val = params_dict.get('rr_min') rr_max_val = params_dict.get('rr_max') try: rr_min_val = float(rr_min_val) if rr_min_val is not None else None except Exception: rr_min_val = None try: rr_max_val = float(rr_max_val) if rr_max_val is not None else None except Exception: rr_max_val = None if rr_min_val is not None and rr_min_val <= 0: rr_min_val = None if rr_max_val is not None and rr_max_val <= 0: rr_max_val = None tp_min_val = float(params_dict.get('tp_min', tp_min)) tp_max_val = float(params_dict.get('tp_max', tp_max)) tp_steps_val = max(1, int(params_dict.get('tp_steps', tp_steps))) sl_min_val = float(params_dict.get('sl_min', sl_min)) sl_max_val = float(params_dict.get('sl_max', sl_max)) sl_steps_val = max(1, int(params_dict.get('sl_steps', sl_steps))) need = int(max(300, horizon + 100)) df = _fetch_history(symbol, timeframe, need, as_of=None) if len(df) < 10: return {"error": "Insufficient history for simulation"} last_price = float(df['close'].astype(float).iloc[-1]) pip_size = None try: info = mt5.symbol_info(symbol) if info is not None: digits = int(getattr(info, 'digits', 0) or 0) point = float(getattr(info, 'point', 0.0) or 0.0) if point > 0: pip_size = float(point * (10.0 if digits in (3, 5) else 1.0)) except Exception: pip_size = None if mode_val == 'pips' and (pip_size is None or pip_size <= 0): return {"error": "Pip size unavailable for this symbol; use mode='pct' or provide absolute barriers."} base_col = 'close' if denoise: try: from ..utils.denoise import _apply_denoise as _apply_denoise_util added = _apply_denoise_util(df, denoise, default_when='pre_ti') if f"{base_col}_dn" in added: base_col = f"{base_col}_dn" except Exception: pass prices = df[base_col].astype(float).to_numpy() sims = int(params_dict.get('n_sims', params_dict.get('sims', 4000)) or 4000) seed = int(params_dict.get('seed', 42) or 42) n_seeds = int(params_dict.get('n_seeds', 1) or 1) paths_list: List[np.ndarray] = [] method_name = str(method).lower() if method_name == 'mc_gbm': for offset in range(max(1, n_seeds)): sim = _simulate_gbm_mc(prices, horizon=int(horizon), n_sims=int(sims), seed=int(seed + offset)) paths_list.append(np.asarray(sim['price_paths'], dtype=float)) elif method_name == 'hmm_mc': n_states = int(params_dict.get('n_states', 2) or 2) for offset in range(max(1, n_seeds)): sim = _simulate_hmm_mc(prices, horizon=int(horizon), n_states=int(n_states), n_sims=int(sims), seed=int(seed + offset)) paths_list.append(np.asarray(sim['price_paths'], dtype=float)) elif method_name == 'garch': p_order = int(params_dict.get('p', 1)) q_order = int(params_dict.get('q', 1)) for offset in range(max(1, n_seeds)): sim = _simulate_garch_mc(prices, horizon=int(horizon), n_sims=int(sims), seed=int(seed + offset), p_order=p_order, q_order=q_order) paths_list.append(np.asarray(sim['price_paths'], dtype=float)) elif method_name == 'bootstrap': bs = params_dict.get('block_size') if bs: bs = int(bs) for offset in range(max(1, n_seeds)): sim = _simulate_bootstrap_mc(prices, horizon=int(horizon), n_sims=int(sims), seed=int(seed + offset), block_size=bs) paths_list.append(np.asarray(sim['price_paths'], dtype=float)) else: return {"error": f"Unsupported method: {method}. Use 'mc_gbm', 'hmm_mc', 'garch', or 'bootstrap'"} paths = np.vstack(paths_list) if len(paths_list) > 1 else paths_list[0] S, H = paths.shape last_idx = H - 1 def _linspace(a: float, b: float, n: int) -> np.ndarray: try: return np.linspace(float(a), float(b), int(max(1, n))) except Exception: return np.array([float(a)]) seen: Set[Tuple[int, int]] = set() base_candidates: List[Tuple[float, float]] = [] def _push(tp_unit: float, sl_unit: float, bucket: List[Tuple[float, float]]) -> None: try: tp_val = float(tp_unit) sl_val = float(sl_unit) except (TypeError, ValueError): return if not np.isfinite(tp_val) or not np.isfinite(sl_val): return if tp_val <= 0 or sl_val <= 0: return key = (int(round(tp_val * 1e6)), int(round(sl_val * 1e6))) if key in seen: return seen.add(key) bucket.append((tp_val, sl_val)) def _add_fixed(bucket: List[Tuple[float, float]], tp_a: float, tp_b: float, tp_n: int, sl_a: float, sl_b: float, sl_n: int) -> None: for tp_val in _linspace(tp_a, tp_b, tp_n): for sl_val in _linspace(sl_a, sl_b, sl_n): _push(tp_val, sl_val, bucket) vol_context: Optional[Dict[str, Any]] = None if grid_style_val == 'preset': preset_key = preset_val or 'intraday' cfg = BARRIER_GRID_PRESETS.get(preset_key, BARRIER_GRID_PRESETS['intraday']) if mode_val == 'pct': _add_fixed(base_candidates, cfg['tp_min'], cfg['tp_max'], int(cfg['tp_steps']), cfg['sl_min'], cfg['sl_max'], int(cfg['sl_steps'])) else: scale = (float(last_price) / float(pip_size)) / 100.0 _add_fixed(base_candidates, cfg['tp_min'] * scale, cfg['tp_max'] * scale, int(cfg['tp_steps']), cfg['sl_min'] * scale, cfg['sl_max'] * scale, int(cfg['sl_steps'])) elif grid_style_val == 'volatility': # Calculate simple volatility over window rets = np.diff(np.log(prices)) if len(rets) > vol_window_val: rets = rets[-vol_window_val:] vol_per_bar = np.std(rets) vol_horizon = vol_per_bar * np.sqrt(horizon) # Convert to percentage space for baseline vol_pct = vol_horizon * 100.0 if mode_val == 'pct': tp_start = max(vol_floor_pct_val, vol_pct * vol_min_mult_val) tp_end = max(tp_start * 1.1, vol_pct * vol_max_mult_val) sl_start = max(vol_floor_pct_val, vol_pct * vol_min_mult_val * 0.8) _add_fixed(base_candidates, tp_start, tp_end, vol_steps_val, sl_start, sl_start * vol_sl_multiplier_val, vol_sl_steps_val) else: # Convert volatility to pips and apply pips floor when in pips mode vol_pips = (vol_pct / 100.0) * (last_price / float(pip_size)) tp_start = max(vol_floor_pips_val, vol_pips * vol_min_mult_val) tp_end = max(tp_start * 1.1, vol_pips * vol_max_mult_val) sl_start = max(vol_floor_pips_val, vol_pips * vol_min_mult_val * 0.8) _add_fixed(base_candidates, tp_start, tp_end, vol_steps_val, sl_start, sl_start * vol_sl_multiplier_val, vol_sl_steps_val) elif grid_style_val == 'ratio': # Fixed SL grid, TP derived from ratios sl_start = sl_min_val sl_end = sl_max_val for sl_val in _linspace(sl_start, sl_end, sl_steps_val): for r in _linspace(ratio_min_val, ratio_max_val, ratio_steps_val): _push(sl_val * r, sl_val, base_candidates) else: # fixed _add_fixed(base_candidates, tp_min_val, tp_max_val, tp_steps_val, sl_min_val, sl_max_val, sl_steps_val) # Evaluate candidates results: List[Dict[str, Any]] = [] dir_long = str(direction).lower() == 'long' def _evaluate(bucket: List[Tuple[float, float]]) -> List[Dict[str, Any]]: out: List[Dict[str, Any]] = [] for tp_unit, sl_unit in bucket: # Convert to price levels if mode_val == 'pct': if dir_long: tp_p = last_price * (1.0 + tp_unit/100.0) sl_p = last_price * (1.0 - sl_unit/100.0) else: tp_p = last_price * (1.0 - tp_unit/100.0) sl_p = last_price * (1.0 + sl_unit/100.0) else: # pips if dir_long: tp_p = last_price + tp_unit * pip_size sl_p = last_price - sl_unit * pip_size else: tp_p = last_price - tp_unit * pip_size sl_p = last_price + sl_unit * pip_size # Vectorized hit detection if dir_long: hit_tp = (paths >= tp_p) hit_sl = (paths <= sl_p) else: hit_tp = (paths <= tp_p) hit_sl = (paths >= sl_p) any_tp = hit_tp.any(axis=1) any_sl = hit_sl.any(axis=1) first_tp = hit_tp.argmax(axis=1) first_sl = hit_sl.argmax(axis=1) first_tp[~any_tp] = H first_sl[~any_sl] = H wins = (first_tp < first_sl) losses = (first_sl < first_tp) ties = (first_tp == first_sl) & (first_tp < H) n_wins = wins.sum() n_losses = losses.sum() prob_win = n_wins / S prob_loss = n_losses / S prob_tie = ties.sum() / S prob_neutral = max(0.0, 1.0 - prob_win - prob_loss) risk = sl_unit reward = tp_unit rr = reward / risk if risk > 0 else 0 if rr_min_val and rr < rr_min_val: continue if rr_max_val and rr > rr_max_val: continue ev_uncond = prob_win * reward - prob_loss * risk edge = prob_win - prob_loss kelly_uncond = 0.0 if rr > 0: kelly_uncond = prob_win - (prob_loss / rr) # Conditional metrics (ignore neutral paths) active = prob_win + prob_loss if active > 0: prob_win_c = prob_win / active prob_loss_c = prob_loss / active ev_cond = prob_win_c * reward - prob_loss_c * risk kelly_cond = prob_win_c - (prob_loss_c / rr if rr > 0 else 0.0) else: ev_cond = 0.0 kelly_cond = 0.0 # Hit time medians (bars, 1-based) for transparency t_hit_tp = (first_tp[wins | ties] + 1) t_hit_sl = (first_sl[losses | ties] + 1) t_tp_med = float(np.median(t_hit_tp)) if t_hit_tp.size else None t_sl_med = float(np.median(t_hit_sl)) if t_hit_sl.size else None res = { 'tp': tp_unit, 'sl': sl_unit, 'rr': rr, 'prob_win': prob_win, 'prob_loss': prob_loss, 'prob_tp_first': prob_win, 'prob_sl_first': prob_loss, 'prob_no_hit': prob_neutral, 'prob_tie': prob_tie, 'ev': ev_uncond, 'ev_uncond': ev_uncond, 'edge': edge, 'kelly': kelly_uncond, 'kelly_uncond': kelly_uncond, 't_hit_tp_median': t_tp_med, 't_hit_sl_median': t_sl_med, } out.append(res) return out results.extend(_evaluate(base_candidates)) def _sort(res_list: List[Dict[str, Any]]) -> None: if objective_val == 'edge': res_list.sort(key=lambda x: x['edge'], reverse=True) elif objective_val == 'ev': res_list.sort(key=lambda x: x['ev'], reverse=True) elif objective_val == 'ev_uncond': res_list.sort(key=lambda x: x['ev_uncond'], reverse=True) elif objective_val == 'kelly': res_list.sort(key=lambda x: x['kelly'], reverse=True) elif objective_val == 'kelly_uncond': res_list.sort(key=lambda x: x['kelly_uncond'], reverse=True) elif objective_val == 'prob_tp_first': res_list.sort(key=lambda x: x['prob_win'], reverse=True) _sort(results) if refine_flag and results: best_seed = results[0] tp_c = best_seed['tp'] sl_c = best_seed['sl'] tp_a = max(1e-9, tp_c * (1.0 - refine_radius_val)) tp_b = tp_c * (1.0 + refine_radius_val) sl_a = max(1e-9, sl_c * (1.0 - refine_radius_val)) sl_b = sl_c * (1.0 + refine_radius_val) refine_candidates: List[Tuple[float, float]] = [] _add_fixed(refine_candidates, tp_a, tp_b, refine_steps_val, sl_a, sl_b, refine_steps_val) results.extend(_evaluate(refine_candidates)) _sort(results) if top_k: results = results[:top_k] grid_out = results if return_grid else None if output == 'summary' and grid_out is not None: limit = top_k or min(10, len(grid_out)) grid_out = grid_out[:limit] return { "success": True, "symbol": symbol, "timeframe": timeframe, "method": method, "horizon": horizon, "direction": direction, "mode": mode, "objective": objective, "results": results, "best": results[0] if results else None, "grid": grid_out } except Exception as e: return {"error": f"Error optimizing barriers: {str(e)}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/emerzon/mt-data-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server