Skip to main content
Glama
emerzon

MetaTrader5 MCP Server

by emerzon
denoise.py15.1 kB
from typing import Any, Dict, List, Optional, Tuple import numpy as np import pandas as pd try: import pywt as _pywt # type: ignore except Exception: _pywt = None # optional try: from PyEMD import EMD as _EMD, EEMD as _EEMD, CEEMDAN as _CEEMDAN # type: ignore except Exception: _EMD = _EEMD = _CEEMDAN = None # optional def _denoise_series( s: pd.Series, method: str = 'none', params: Optional[Dict[str, Any]] = None, causality: str = 'zero_phase', ) -> pd.Series: if params is None: params = {} method = (method or 'none').lower().strip() n = len(s) if n < 3: return s if method == 'none': return s # Forward/backward fill using modern accessors to avoid FutureWarning x = s.astype(float).ffill().bfill().values if method == 'ema': alpha = params.get('alpha') span = params.get('span', 10) if alpha is not None: y = pd.Series(x).ewm(alpha=float(alpha), adjust=False).mean().values else: y = pd.Series(x).ewm(span=int(span), adjust=False).mean().values if causality == 'zero_phase': y2 = pd.Series(y[::-1]).ewm(span=int(span), adjust=False).mean().values[::-1] y = 0.5 * (y + y2) return pd.Series(y, index=s.index) if method == 'sma': window = max(1, int(params.get('window', 10))) if causality == 'zero_phase': y = pd.Series(x).rolling(window=window, center=True, min_periods=1).mean().values else: y = pd.Series(x).rolling(window=window, min_periods=1).mean().values return pd.Series(y, index=s.index) if method == 'median': window = max(1, int(params.get('window', 7))) if causality == 'zero_phase': y = pd.Series(x).rolling(window=window, center=True, min_periods=1).median().values else: y = pd.Series(x).rolling(window=window, min_periods=1).median().values return pd.Series(y, index=s.index) if method == 'lowpass_fft': cutoff_ratio = float(params.get('cutoff_ratio', 0.1)) X = np.fft.rfft(x) kmax = int(len(X) * cutoff_ratio) Y = np.zeros_like(X) Y[:max(1, kmax)] = X[:max(1, kmax)] y = np.fft.irfft(Y, n=len(x)) return pd.Series(y, index=s.index) if method == 'wavelet' and _pywt is not None: wavelet = str(params.get('wavelet', 'db4')) level = params.get('level') mode = str(params.get('mode', 'soft')) coeffs = _pywt.wavedec(x, wavelet, mode='periodization', level=level) sigma = np.median(np.abs(coeffs[-1])) / 0.6745 if len(coeffs) > 1 else np.std(x) thr = params.get('threshold', 'auto') thr_val = float(sigma * np.sqrt(2 * np.log(len(x)))) if thr == 'auto' else float(thr) new_coeffs = [coeffs[0]] for c in coeffs[1:]: new_coeffs.append(_pywt.threshold(c, thr_val, mode=mode)) y = _pywt.waverec(new_coeffs, wavelet, mode='periodization')[: len(x)] return pd.Series(y, index=s.index) if method in ('emd', 'eemd', 'ceemdan') and any(x is not None for x in (_EMD, _EEMD, _CEEMDAN)): xnp = np.asarray(x, dtype=float) max_imfs = params.get('max_imfs', 'auto') if isinstance(max_imfs, str) and max_imfs == 'auto': import math k = int(max(2, min(10, round(math.log2(len(xnp)))))) else: k = int(max_imfs) if method == 'emd' and _EMD is not None: emd = _EMD() imfs = emd.emd(xnp, max_imf=k) elif method == 'eemd' and _EEMD is not None: noise_strength = float(params.get('noise_strength', 0.2)) trials = int(params.get('trials', 100)) random_state = params.get('random_state') eemd = _EEMD(trials=trials, noise_strength=noise_strength) if random_state is not None: eemd.random_state = int(random_state) imfs = eemd.eemd(xnp, max_imf=k) else: if _CEEMDAN is not None: noise_strength = float(params.get('noise_strength', 0.2)) trials = int(params.get('trials', 100)) random_state = params.get('random_state') ce = _CEEMDAN(trials=trials, noise_strength=noise_strength) if random_state is not None: ce.random_state = int(random_state) imfs = ce.ceemdan(xnp, max_imf=k) else: return s imfs = np.atleast_2d(imfs) resid = xnp - imfs.sum(axis=0) k_all = list(range(imfs.shape[0])) keep_imfs = params.get('keep_imfs') drop_imfs = params.get('drop_imfs', [0]) if isinstance(keep_imfs, (list, tuple)) and len(keep_imfs) > 0: k_sel = [k for k in keep_imfs if 0 <= int(k) < imfs.shape[0]] elif isinstance(drop_imfs, (list, tuple)) and len(drop_imfs) > 0: drop = {int(k) for k in drop_imfs} k_sel = [k for k in k_all if k not in drop] else: k_sel = [k for k in k_all if k != 0] y = resid + imfs[k_sel].sum(axis=0) if len(k_sel) > 0 else resid return pd.Series(y, index=s.index) return s def _apply_denoise( df: pd.DataFrame, spec: Optional[Dict[str, Any]], default_when: str = 'post_ti', ) -> List[str]: added_cols: List[str] = [] if not spec or not isinstance(spec, dict): return added_cols method = str(spec.get('method', 'none')).lower() if method == 'none': return added_cols params = spec.get('params') or {} cols = spec.get('columns') or 'ohlcv' # Normalize columns selection if isinstance(cols, str): key = cols.strip().lower() if key in ('ohlcv', 'ohlc', 'price'): # Map to price + volume columns present selected = [] for name in ('open', 'high', 'low', 'close'): if name in df.columns: selected.append(name) # Prefer real volume, else tick_volume if 'volume' in df.columns: selected.append('volume') elif 'tick_volume' in df.columns: selected.append('tick_volume') cols = selected if selected else ['close'] elif key in ('all', '*', 'numeric'): try: cols = [ c for c in df.columns if c != 'time' and not str(c).startswith('_') and pd.api.types.is_numeric_dtype(df[c]) ] except Exception: cols = ['close'] else: # Support comma/space-separated list in CLI shorthand parts = [p.strip() for p in cols.replace(',', ' ').split() if p.strip()] cols = parts if parts else ['close'] when = str(spec.get('when') or default_when) causality = str(spec.get('causality') or ('causal' if when == 'pre_ti' else 'zero_phase')) keep_original = bool(spec.get('keep_original')) if 'keep_original' in spec else (when != 'pre_ti') suffix = str(spec.get('suffix') or '_dn') for col in cols: if col not in df.columns: continue try: y = _denoise_series(df[col], method=method, params=params, causality=causality) except Exception: continue if keep_original: new_col = f"{col}{suffix}" df[new_col] = y added_cols.append(new_col) else: df[col] = y return added_cols def get_denoise_methods_data() -> Dict[str, Any]: def avail_requires(name: str) -> Tuple[bool, str]: if name == 'wavelet': return (_pywt is not None, 'PyWavelets') if name in ('emd', 'eemd', 'ceemdan'): return (any(x is not None for x in (_EMD, _EEMD, _CEEMDAN)), 'EMD-signal') return (True, '') methods: List[Dict[str, Any]] = [] base_defaults = {"when": "pre_ti", "columns": ["close"], "keep_original": False, "suffix": "_dn"} def add(method: str, description: str, params: List[Dict[str, Any]], supports: Dict[str, bool]): available, requires = avail_requires(method) methods.append({ "method": method, "available": bool(available), "requires": requires, "description": description, "params": params, "supports": supports, "defaults": base_defaults, }) add("none", "No denoising (identity).", [], {"causal": True, "zero_phase": True}) add("ema", "Exponential moving average; causal by default; zero-phase via forward-backward pass.", [ {"name": "span", "type": "int", "default": 10, "description": "Smoothing span; alternative to alpha."}, {"name": "alpha", "type": "float", "default": None, "description": "Direct smoothing factor in (0,1]; overrides span if set."}, ], {"causal": True, "zero_phase": True}) add("sma", "Simple moving average; causal or zero-phase (centered convolution).", [ {"name": "window", "type": "int", "default": 10, "description": "Window length in samples."}, ], {"causal": True, "zero_phase": True}) add("median", "Rolling median; robust to spikes; causal or zero-phase (centered).", [ {"name": "window", "type": "int", "default": 7, "description": "Window length in samples (odd recommended)."}, ], {"causal": True, "zero_phase": True}) add("lowpass_fft", "Zero-phase low-pass filtering in frequency domain; parameterized by cutoff ratio.", [ {"name": "cutoff_ratio", "type": "float", "default": 0.1, "description": "Cutoff as fraction of Nyquist (0, 0.5]."}, ], {"causal": False, "zero_phase": True}) add("wavelet", "Wavelet shrinkage denoising using PyWavelets; preserves sharp changes better than linear filters.", [ {"name": "wavelet", "type": "str", "default": "db4", "description": "Wavelet family, e.g., 'db4', 'sym5'."}, {"name": "level", "type": "int|null", "default": None, "description": "Decomposition level; auto if omitted."}, {"name": "threshold", "type": "float|\"auto\"", "default": "auto", "description": "Shrinkage threshold; 'auto' uses universal threshold."}, {"name": "mode", "type": "str", "default": "soft", "description": "Shrinkage mode: 'soft' or 'hard'."}, ], {"causal": False, "zero_phase": True}) add("emd", "Empirical Mode Decomposition; reconstruct after dropping high-frequency IMFs.", [ {"name": "drop_imfs", "type": "int[]", "default": [0], "description": "IMF indices to drop (0 is highest frequency)."}, {"name": "keep_imfs", "type": "int[]", "default": None, "description": "Explicit list of IMFs to keep; overrides drop_imfs."}, {"name": "max_imfs", "type": "int|\"auto\"", "default": "auto", "description": "Max IMFs; 'auto' ≈ log2(n), capped to [2,10]."}, ], {"causal": False, "zero_phase": True}) add("eemd", "Ensemble EMD; averages decompositions with added noise for robustness.", [ {"name": "drop_imfs", "type": "int[]", "default": [0], "description": "IMF indices to drop (0 is highest frequency)."}, {"name": "keep_imfs", "type": "int[]", "default": None, "description": "Explicit list of IMFs to keep; overrides drop_imfs."}, {"name": "max_imfs", "type": "int|\"auto\"", "default": "auto", "description": "Max IMFs; 'auto' ≈ log2(n), capped to [2,10]."}, {"name": "noise_strength", "type": "float", "default": 0.2, "description": "Relative noise amplitude used in ensembles."}, {"name": "trials", "type": "int", "default": 100, "description": "Number of ensemble trials."}, {"name": "random_state", "type": "int", "default": None, "description": "Random seed for reproducibility."}, ], {"causal": False, "zero_phase": True}) add("ceemdan", "Complementary EEMD with adaptive noise; improved reconstruction quality.", [ {"name": "drop_imfs", "type": "int[]", "default": [0], "description": "IMF indices to drop (0 is highest frequency)."}, {"name": "keep_imfs", "type": "int[]", "default": None, "description": "Explicit list of IMFs to keep; overrides drop_imfs."}, {"name": "max_imfs", "type": "int|\"auto\"", "default": "auto", "description": "Max IMFs; 'auto' ≈ log2(n), capped to [2,10]."}, {"name": "noise_strength", "type": "float", "default": 0.2, "description": "Relative noise amplitude used in decomposition."}, {"name": "trials", "type": "int", "default": 100, "description": "Used if falling back to EEMD implementation."}, {"name": "random_state", "type": "int", "default": None, "description": "Random seed for reproducibility."}, ], {"causal": False, "zero_phase": True}) return {"success": True, "schema_version": 1, "methods": methods} def denoise_list_methods() -> Dict[str, Any]: """List available denoise methods and their parameters.""" try: return get_denoise_methods_data() except Exception as e: return {"error": f"Error listing denoise methods: {e}"} def normalize_denoise_spec(spec: Any, default_when: str = 'pre_ti') -> Optional[Dict[str, Any]]: """Normalize a denoise spec. Accepts dict-like or a method name string. Returns a dict with keys: method, params, columns, when, causality, keep_original, suffix. """ base = {"when": default_when, "columns": ["close"], "keep_original": False, "suffix": "_dn"} if not spec: return None if isinstance(spec, dict): out = dict(base) out.update({k: v for k, v in spec.items() if v is not None}) # Normalize columns field to list cols = out.get('columns') if isinstance(cols, str): parts = [p.strip() for p in cols.replace(',', ' ').split() if p.strip()] out['columns'] = parts if parts else ['close'] if 'params' not in out or out['params'] is None: out['params'] = {} return out # String method name try: method = str(spec).strip().lower() except Exception: return None if method == '' or method == 'none': return None # Method-specific default params params: Dict[str, Any] = {} if method == 'ema': params = {"span": 10} elif method == 'sma': params = {"window": 10} elif method == 'median': params = {"window": 7} elif method == 'lowpass_fft': params = {"cutoff_ratio": 0.1} elif method == 'wavelet': params = {"wavelet": "db4", "level": None, "threshold": "auto", "mode": "soft"} elif method == 'emd': params = {"drop_imfs": [0], "keep_imfs": None, "max_imfs": "auto"} elif method == 'eemd': params = {"drop_imfs": [0], "keep_imfs": None, "max_imfs": "auto", "noise_strength": 0.2, "trials": 100} elif method == 'ceemdan': params = {"drop_imfs": [0], "keep_imfs": None, "max_imfs": "auto", "noise_strength": 0.2, "trials": 100} else: # Unknown method; ignore return None out = dict(base) out.update({"method": method, "params": params}) return out

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/emerzon/mt-data-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server