Skip to main content
Glama
emerzon

MetaTrader5 MCP Server

by emerzon
patterns.py25 kB
from datetime import datetime from typing import Any, Dict, Optional, List, Tuple, Set import pandas as pd import warnings import numpy as np from .schema import TimeframeLiteral from .constants import TIMEFRAME_MAP from ..utils.mt5 import _mt5_copy_rates_from, _mt5_epoch_to_utc from ..utils.utils import _csv_from_rows_util, _format_time_minimal_util, _format_time_minimal_local_util, _use_client_tz_util, _time_format_from_epochs_util, _maybe_strip_year_util, _style_time_format_util, to_float_np as __to_float_np from ..patterns.classic import detect_classic_patterns as _detect_classic_patterns, ClassicDetectorConfig as _ClassicCfg from ..patterns.eliott import detect_elliott_waves as _detect_elliott_waves, ElliottWaveConfig as _ElliottCfg from datetime import datetime from typing import Any, Dict, Optional, List, Tuple, Set, Literal import pandas as pd import warnings import numpy as np from .schema import TimeframeLiteral from .constants import TIMEFRAME_MAP from ..utils.mt5 import _mt5_copy_rates_from, _mt5_epoch_to_utc from ..utils.utils import _csv_from_rows_util, _format_time_minimal_util, _format_time_minimal_local_util, _use_client_tz_util, _time_format_from_epochs_util, _maybe_strip_year_util, _style_time_format_util, to_float_np as __to_float_np from ..patterns.classic import detect_classic_patterns as _detect_classic_patterns, ClassicDetectorConfig as _ClassicCfg from ..patterns.eliott import detect_elliott_waves as _detect_elliott_waves, ElliottWaveConfig as _ElliottCfg from .server import mcp, _auto_connect_wrapper, _ensure_symbol_ready from ..utils.denoise import _apply_denoise as _apply_denoise_util, normalize_denoise_spec as _normalize_denoise_spec import MetaTrader5 as mt5 @mcp.tool() @_auto_connect_wrapper def patterns_detect( symbol: str, timeframe: TimeframeLiteral = "H1", mode: Literal['candlestick', 'classic', 'elliott'] = 'candlestick', # type: ignore limit: int = 1000, # Candlestick specific min_strength: float = 0.95, min_gap: int = 3, robust_only: bool = True, whitelist: Optional[str] = None, top_k: int = 1, # Classic/Elliott specific denoise: Optional[Dict[str, Any]] = None, config: Optional[Dict[str, Any]] = None, include_series: bool = False, series_time: str = "string", include_completed: bool = False, ) -> Dict[str, Any]: """Detect chart patterns (candlestick, classic chart patterns, or Elliott Wave). **REQUIRED**: symbol parameter must be provided (e.g., "EURUSD", "BTCUSD") Parameters: ----------- symbol : str (REQUIRED) Trading symbol to analyze (e.g., "EURUSD", "GBPUSD", "BTCUSD") timeframe : str, optional (default="H1") Chart timeframe: "M1", "M5", "M15", "M30", "H1", "H4", "D1", "W1", "MN1" mode : str, optional (default="candlestick") Pattern detection method: - "candlestick": Japanese candlestick patterns (Doji, Hammer, Engulfing, etc.) - "classic": Chart patterns (Head & Shoulders, Triangles, Flags, etc.) - "elliott": Elliott Wave patterns limit : int, optional (default=1000) Number of historical bars to analyze Candlestick Mode Parameters: ---------------------------- min_strength : float, optional (default=0.95) Minimum pattern strength threshold (0.0 to 1.0) min_gap : int, optional (default=3) Minimum gap between patterns (in bars) robust_only : bool, optional (default=True) Only return high-confidence patterns whitelist : str, optional Comma-separated list of specific patterns to detect (e.g., "doji,hammer,engulfing") top_k : int, optional (default=1) Return only the top K strongest patterns Classic/Elliott Mode Parameters: --------------------------------- denoise : dict, optional Denoising configuration to smooth price data config : dict, optional Pattern-specific configuration parameters include_series : bool, optional (default=False) Include the price series data in the response series_time : str, optional (default="string") Time format for series data include_completed : bool, optional (default=False) Include only completed patterns Returns: -------- dict Pattern detection results including: - success: bool - symbol: str - timeframe: str - patterns: list of detected patterns with metadata Examples: --------- # Detect candlestick patterns patterns_detect(symbol="EURUSD") # Detect candlestick patterns on M15 with custom parameters patterns_detect(symbol="EURUSD", timeframe="M15", min_strength=0.90, top_k=3) # Detect classic chart patterns patterns_detect(symbol="GBPUSD", mode="classic", limit=500) # Detect Elliott Wave patterns patterns_detect(symbol="BTCUSD", mode="elliott", timeframe="H4") """ try: if mode == 'candlestick': # Reuse the logic from original patterns_detect_candlesticks # We need to handle the fact that 'limit' here might be large (1000) default, # but candlesticks usually want fewer. If user didn't specify, maybe clamp it? # But the original default was 10. Let's respect the passed limit. # ... (Logic from patterns_detect_candlesticks) ... # To avoid code duplication, I will inline the logic or call a helper if I had one. # Since I am replacing the file content, I must implement the logic here. if timeframe not in TIMEFRAME_MAP: return {"error": f"Invalid timeframe: {timeframe}. Valid options: {list(TIMEFRAME_MAP.keys())}"} mt5_timeframe = TIMEFRAME_MAP[timeframe] _info_before = mt5.symbol_info(symbol) _was_visible = bool(_info_before.visible) if _info_before is not None else None err = _ensure_symbol_ready(symbol) if err: return {"error": err} try: utc_now = datetime.utcnow() # For candlesticks, if limit is huge (default 1000), maybe reduce it if not explicitly set? # But we can't know if it was explicitly set easily. # Let's just use it. 1000 candles is fine for TA-Lib. rates = _mt5_copy_rates_from(symbol, mt5_timeframe, utc_now, limit) finally: if _was_visible is False: try: mt5.symbol_select(symbol, False) except Exception: pass if rates is None: return {"error": f"Failed to get rates for {symbol}: {mt5.last_error()}"} if len(rates) == 0: return {"error": "No candle data available"} df = pd.DataFrame(rates) try: if 'time' in df.columns: df['time'] = df['time'].astype(float).apply(_mt5_epoch_to_utc) except Exception: pass epochs = [float(t) for t in df['time'].tolist()] if 'time' in df.columns else [] _use_ctz = _use_client_tz_util() if _use_ctz: with warnings.catch_warnings(): warnings.simplefilter("ignore") df['time'] = df['time'].apply(_format_time_minimal_local_util) else: time_fmt = _time_format_from_epochs_util(epochs) if epochs else "%Y-%m-%d %H:%M" time_fmt = _maybe_strip_year_util(time_fmt, epochs) time_fmt = _style_time_format_util(time_fmt) with warnings.catch_warnings(): warnings.simplefilter("ignore") df['time'] = df['time'].apply(lambda t: datetime.utcfromtimestamp(float(t)).strftime(time_fmt)) for col in ['open', 'high', 'low', 'close']: if col not in df.columns: return {"error": f"Missing '{col}' data from rates"} try: temp = df.copy() temp['__epoch'] = [float(e) for e in epochs] temp.index = pd.to_datetime(temp['__epoch'], unit='s') except Exception: temp = df.copy() pattern_methods: List[str] = [] try: for attr in dir(temp.ta): if not attr.startswith('cdl_'): continue func = getattr(temp.ta, attr, None) if callable(func): pattern_methods.append(attr) except Exception: pass if not pattern_methods: return {"error": "No candlestick pattern detectors (cdl_*) found in pandas_ta."} before_cols = set(temp.columns) for name in sorted(pattern_methods): try: method = getattr(temp.ta, name) with warnings.catch_warnings(): warnings.simplefilter("ignore") method(append=True) except Exception: continue pattern_cols = [c for c in temp.columns if c not in before_cols and c.lower().startswith('cdl_')] if not pattern_cols: return {"error": "No candle patterns produced any outputs."} rows: List[List[Any]] = [] try: thr = float(min_strength) except Exception: thr = 0.95 if thr > 1.0: thr = thr / 100.0 thr = max(0.0, min(1.0, thr)) _robust_whitelist = { 'engulfing','harami','3inside','3outside','eveningstar','morningstar', 'darkcloudcover','piercing','inside','outside','hikkake' } if whitelist and isinstance(whitelist, str): try: parts = [p.strip() for p in whitelist.split(',') if p.strip()] if parts: _robust_whitelist = {p.replace('_','').replace(' ','').lower() for p in parts} except Exception: pass def _norm_name(n: str) -> str: return str(n).replace('_','').replace(' ','').lower() try: gap = max(0, int(min_gap)) except Exception: gap = 3 last_pick_idx = -10**9 _deprioritize = { 'shortline', 'longline', 'spinningtop', 'highwave', 'marubozu', 'closingmarubozu', 'doji', 'gravestonedoji', 'longleggeddoji', 'rickshawman' } # Use limit as the tail size, but since we fetched 'limit' bars, we process all of them? # Original logic: fetched 'limit' bars, then processed 'limit' bars. # So we just iterate over the whole df. df_tail = df temp_tail = temp for i in range(len(temp_tail)): hits: List[Tuple[str, float]] = [] for col in pattern_cols: try: val = float(temp_tail.iloc[i][col]) except Exception: continue if abs(val) >= (thr * 100.0): name = col if name.lower().startswith('cdl_'): name = name[len('cdl_'):] if (not robust_only) or (_norm_name(name) in _robust_whitelist): hits.append((name, val)) if not hits: continue if i - last_pick_idx < gap: continue non_dep = [(n, v) for (n, v) in hits if n.split('_')[0].lower() not in _deprioritize] pool = non_dep if non_dep else hits try: k = max(1, int(top_k)) except Exception: k = 1 picks = sorted(pool, key=lambda x: abs(x[1]), reverse=True)[:k] t_val = str(df_tail.iloc[i].get('time')) if 'time' in df_tail.columns else '' for name, value in picks: label_core = name.replace('_', ' ').strip().upper() dir_title = 'Bullish' if value > 0 else 'Bearish' rows.append([t_val, f"{dir_title} {label_core}" if label_core else dir_title]) last_pick_idx = i headers = ["time", "pattern"] payload = _csv_from_rows_util(headers, rows) payload.update({ "success": True, "symbol": symbol, "timeframe": timeframe, "candles": int(limit), "mode": mode, }) if not _use_ctz: payload["timezone"] = "UTC" return payload elif mode == 'classic': # Logic from patterns_detect_classic if timeframe not in TIMEFRAME_MAP: return {"error": f"Invalid timeframe: {timeframe}. Valid options: {list(TIMEFRAME_MAP.keys())}"} mt5_tf = TIMEFRAME_MAP[timeframe] _info = mt5.symbol_info(symbol) _was_visible = bool(_info.visible) if _info is not None else None try: if _was_visible is False: mt5.symbol_select(symbol, True) except Exception: pass utc_now = datetime.utcnow() count = max(400, int(limit) + 2) rates = _mt5_copy_rates_from(symbol, mt5_tf, utc_now, count) if rates is None or len(rates) < 100: return {"error": f"Failed to fetch sufficient bars for {symbol}"} df = pd.DataFrame(rates) if 'volume' not in df.columns and 'tick_volume' in df.columns: with warnings.catch_warnings(): warnings.simplefilter("ignore") df['volume'] = df['tick_volume'] if len(df) >= 2: df = df.iloc[:-1] if denoise: try: dn = _normalize_denoise_spec(denoise, default_when='pre_ti') if dn: _apply_denoise_util(df, dn, default_when='pre_ti') except Exception: pass if len(df) > int(limit): df = df.iloc[-int(limit):].copy() cfg = _ClassicCfg() if isinstance(config, dict): for k, v in config.items(): if hasattr(cfg, k): try: setattr(cfg, k, type(getattr(cfg, k))(v)) except Exception: try: setattr(cfg, k, v) except Exception: pass pats = _detect_classic_patterns(df, cfg) def _round(x): try: return float(np.round(float(x), 8)) except Exception: return x out_list = [] n_bars = len(df) def _estimate_bars_to_completion(name: str, details: Dict[str, Any], start_idx: int, end_idx: int) -> Optional[int]: try: length = max(1, int(end_idx) - int(start_idx) + 1) nm = str(name).lower() if all(k in details for k in ("top_slope", "top_intercept", "bottom_slope", "bottom_intercept")): s_top = float(details.get("top_slope")) b_top = float(details.get("top_intercept")) s_bot = float(details.get("bottom_slope")) b_bot = float(details.get("bottom_intercept")) denom = (s_top - s_bot) if abs(denom) <= 1e-12: return None t_star = (b_bot - b_top) / denom bars = int(max(0, int(round(t_star - (n_bars - 1))))) return int(min(max(0, bars), 3 * length)) if all(k in details for k in ("upper_slope", "upper_intercept", "lower_slope", "lower_intercept")): s_top = float(details.get("upper_slope")) b_top = float(details.get("upper_intercept")) s_bot = float(details.get("lower_slope")) b_bot = float(details.get("lower_intercept")) denom = (s_top - s_bot) if abs(denom) <= 1e-12: return None t_star = (b_bot - b_top) / denom bars = int(max(0, int(round(t_star - (n_bars - 1))))) return int(min(max(0, bars), 3 * length)) if nm in ("pennants", "flag", "bull pennants", "bear pennants", "bull flag", "bear flag") or ("pennant" in nm or "flag" in nm): return int(max(1, min(2 * length, int(round(0.3 * length))))) except Exception: return None return None for p in pats: try: st_epoch = float(p.start_time) if p.start_time is not None else None et_epoch = float(p.end_time) if p.end_time is not None else None try: start_date = _format_time_minimal_util(st_epoch) if st_epoch is not None else None except Exception: start_date = None try: end_date = _format_time_minimal_util(et_epoch) if et_epoch is not None else None except Exception: end_date = None d = { "name": p.name, "status": p.status, "confidence": float(max(0.0, min(1.0, p.confidence))), "start_index": int(p.start_index), "end_index": int(p.end_index), "start_date": start_date, "end_date": end_date, "details": {k: _round(v) for k, v in (p.details or {}).items()}, } if p.status == 'forming': est = _estimate_bars_to_completion(p.name, d["details"], d["start_index"], d["end_index"]) if est is not None: d["bars_to_completion"] = int(est) out_list.append(d) except Exception: continue filtered = out_list if include_completed else [d for d in out_list if str(d.get('status','')).lower() == 'forming'] resp: Dict[str, Any] = { "success": True, "symbol": symbol, "timeframe": timeframe, "lookback": int(limit), "mode": mode, "patterns": filtered, "n_patterns": int(len(filtered)), } if include_series: resp["series_close"] = [float(v) for v in __to_float_np(df.get('close')).tolist()] if 'time' in df.columns: if str(series_time).lower() == 'epoch': resp["series_epoch"] = [float(v) for v in __to_float_np(df.get('time')).tolist()] else: resp["series_time"] = [ _format_time_minimal_util(float(v)) for v in __to_float_np(df.get('time')).tolist() ] return resp elif mode == 'elliott': # Logic from patterns_detect_elliott_wave if timeframe not in TIMEFRAME_MAP: return {"error": f"Invalid timeframe: {timeframe}. Valid options: {list(TIMEFRAME_MAP.keys())}"} mt5_tf = TIMEFRAME_MAP[timeframe] _info = mt5.symbol_info(symbol) _was_visible = bool(_info.visible) if _info is not None else None try: if _was_visible is False: mt5.symbol_select(symbol, True) except Exception: pass utc_now = datetime.utcnow() count = max(400, int(limit) + 2) rates = _mt5_copy_rates_from(symbol, mt5_tf, utc_now, count) if rates is None or len(rates) < 100: return {"error": f"Failed to fetch sufficient bars for {symbol}"} df = pd.DataFrame(rates) if 'volume' not in df.columns and 'tick_volume' in df.columns: with warnings.catch_warnings(): warnings.simplefilter("ignore") df['volume'] = df['tick_volume'] if len(df) >= 2: df = df.iloc[:-1] if denoise: try: dn = _normalize_denoise_spec(denoise, default_when='pre_ti') if dn: _apply_denoise_util(df, dn, default_when='pre_ti') except Exception: pass if len(df) > int(limit): df = df.iloc[-int(limit):].copy() cfg = _ElliottCfg() if isinstance(config, dict): for k, v in config.items(): if hasattr(cfg, k): try: setattr(cfg, k, type(getattr(cfg, k))(v)) except Exception: try: setattr(cfg, k, v) except Exception: pass pats = _detect_elliott_waves(df, cfg) def _round(x): try: return float(np.round(float(x), 8)) except Exception: return x out_list = [] n_bars = len(df) for p in pats: try: st_epoch = float(p.start_time) if p.start_time is not None else None et_epoch = float(p.end_time) if p.end_time is not None else None try: start_date = _format_time_minimal_util(st_epoch) if st_epoch is not None else None except Exception: start_date = None try: end_date = _format_time_minimal_util(et_epoch) if et_epoch is not None else None except Exception: end_date = None try: recent_bars = 3 except Exception: recent_bars = 3 status = 'forming' if int(p.end_index) >= int(n_bars - recent_bars) else 'completed' d = { "wave_type": p.wave_type, "status": status, "confidence": float(max(0.0, min(1.0, p.confidence))), "start_index": int(p.start_index), "end_index": int(p.end_index), "start_date": start_date, "end_date": end_date, "details": {k: _round(v) for k, v in (p.details or {}).items()}, } out_list.append(d) except Exception: continue filtered = out_list if include_completed else [d for d in out_list if str(d.get('status','')).lower() == 'forming'] resp: Dict[str, Any] = { "success": True, "symbol": symbol, "timeframe": timeframe, "lookback": int(limit), "mode": mode, "patterns": filtered, "n_patterns": int(len(filtered)), } if include_series: resp["series_close"] = [float(v) for v in __to_float_np(df.get('close')).tolist()] if 'time' in df.columns: if str(series_time).lower() == 'epoch': resp["series_epoch"] = [float(v) for v in __to_float_np(df.get('time')).tolist()] else: resp["series_time"] = [ _format_time_minimal_util(float(v)) for v in __to_float_np(df.get('time')).tolist() ] return resp else: return {"error": f"Unknown mode: {mode}"} except Exception as e: return {"error": f"Error detecting patterns: {str(e)}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/emerzon/mt-data-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server