Skip to main content
Glama
emerzon

MetaTrader5 MCP Server

by emerzon
regime.py12.2 kB
from typing import Any, Dict, Optional, List, Literal import numpy as np import pandas as pd from .server import mcp, _auto_connect_wrapper from .schema import TimeframeLiteral, DenoiseSpec from ..forecast.common import fetch_history as _fetch_history from ..utils.utils import _format_time_minimal as _format_time_minimal_util from ..utils.denoise import _apply_denoise as _apply_denoise_util def _attach_toon_rows(payload: Dict[str, Any], method: str) -> Dict[str, Any]: """Attach tabular rows so TOON output keeps columns aligned.""" try: times = payload.get("times") if not isinstance(times, list) or not times: return payload rows: List[Dict[str, Any]] = [] if method == "bocpd": probs = payload.get("cp_prob") if isinstance(probs, list): cp_times = set() cps = payload.get("change_points") if isinstance(payload.get("change_points"), list) else [] for cp in cps or []: t = cp.get("time") if isinstance(cp, dict) else None if isinstance(t, str): cp_times.add(t) for t, p in zip(times, probs): rows.append({"time": t, "cp_prob": p, "change_point": t in cp_times}) elif method in ("ms_ar", "hmm"): state = payload.get("state") probs = payload.get("state_probabilities") if isinstance(state, list) and isinstance(probs, list) and probs and isinstance(probs[0], list): n_states = len(probs[0]) for t, s, prow in zip(times, state, probs): row: Dict[str, Any] = {"time": t, "state": s} for j in range(n_states): row[f"p{j}"] = prow[j] if j < len(prow) else None rows.append(row) elif isinstance(state, list): rows = [{"time": t, "state": s} for t, s in zip(times, state)] if rows: payload["rows"] = rows return payload except Exception: return payload @mcp.tool() @_auto_connect_wrapper def regime_detect( symbol: str, timeframe: TimeframeLiteral = "H1", limit: int = 800, method: Literal['bocpd','hmm','ms_ar'] = 'bocpd', # type: ignore target: Literal['return','price'] = 'return', # type: ignore params: Optional[Dict[str, Any]] = None, denoise: Optional[DenoiseSpec] = None, threshold: float = 0.5, output: Literal['full','summary','compact'] = 'full', # type: ignore lookback: int = 300, ) -> Dict[str, Any]: """Detect regimes and/or change-points over the last `limit` bars. - method: 'bocpd' (Bayesian online change-point; Gaussian), 'hmm' (Gaussian mixture/HMM-lite), or 'ms_ar' (Markov-switching AR via statsmodels if available). - target: 'return' (default; log returns) or 'price'. - params: method-specific kwargs, e.g., bocpd: hazard_lambda, max_run_length; hmm: n_states; ms_ar: k_regimes, order. - denoise: optional denoising on 'close' prior to target transform. - threshold: decision threshold for change-point marking (bocpd cp_prob >= threshold). - output: 'full' (default; all time series), 'summary' (stats only), or 'compact' (summary + tail series). """ try: p = dict(params or {}) df = _fetch_history(symbol, timeframe, int(max(limit, 50)), as_of=None) if len(df) < 10: return {"error": "Insufficient history"} base_col = 'close' if denoise: try: added = _apply_denoise_util(df, denoise, default_when='pre_ti') if f"{base_col}_dn" in added: base_col = f"{base_col}_dn" except Exception: pass y = df[base_col].astype(float).to_numpy() times = df['time'].astype(float).to_numpy() if target == 'return': with np.errstate(divide='ignore', invalid='ignore'): x = np.diff(np.log(np.maximum(y, 1e-12))) x = x[np.isfinite(x)] t = times[1: 1 + x.size] else: x = y[np.isfinite(y)] t = times[: x.size] # format times t_fmt = [_format_time_minimal_util(tt) for tt in t] if method == 'bocpd': from ..utils.regime import bocpd_gaussian hazard_lambda = int(p.get('hazard_lambda', 250)) max_rl = int(p.get('max_run_length', min(1000, x.size))) res = bocpd_gaussian(x, hazard_lambda=hazard_lambda, max_run_length=max_rl) cp_prob = res.get('cp_prob', np.zeros_like(x, dtype=float)) cp_idx = [int(i) for i, v in enumerate(cp_prob) if float(v) >= float(threshold)] cps = [{"idx": i, "time": t_fmt[i], "prob": float(cp_prob[i])} for i in cp_idx] payload = { "success": True, "symbol": symbol, "timeframe": timeframe, "method": method, "target": target, "times": t_fmt, "cp_prob": [float(v) for v in np.asarray(cp_prob, dtype=float).tolist()], "change_points": cps, "threshold": float(threshold), "params_used": {"hazard_lambda": hazard_lambda, "max_run_length": max_rl}, } if output in ('summary','compact'): n = min(int(lookback), len(cp_prob)) tail = np.asarray(cp_prob[-n:], dtype=float) if n > 0 else np.asarray(cp_prob, dtype=float) recent_cps = [c for c in cps if c.get('idx', 0) >= (len(cp_prob) - n)] summary = { "lookback": int(n), "last_cp_prob": float(cp_prob[-1]) if len(cp_prob) else float('nan'), "max_cp_prob": float(np.nanmax(tail)) if tail.size else float('nan'), "mean_cp_prob": float(np.nanmean(tail)) if tail.size else float('nan'), "change_points_count": int(len(recent_cps)), "recent_change_points": recent_cps[-5:], } if output == 'summary': payload = {"success": True, "symbol": symbol, "timeframe": timeframe, "method": method, "summary": summary} else: # compact: keep cp_prob only as tail payload["summary"] = summary if n > 0: payload["cp_prob"] = [float(v) for v in tail.tolist()] payload["times"] = t_fmt[-n:] payload["change_points"] = recent_cps return _attach_toon_rows(payload, method) elif method == 'ms_ar': try: from statsmodels.tsa.regime_switching.markov_regression import MarkovRegression # type: ignore except Exception: return {"error": "statsmodels MarkovRegression not available. Install statsmodels."} k_regimes = int(p.get('k_regimes', 2)) order = int(p.get('order', 0)) try: mod = MarkovRegression(endog=x, k_regimes=max(2, k_regimes), trend='c', order=max(0, order), switching_variance=True) res = mod.fit(disp=False, maxiter=int(p.get('maxiter', 100))) smoothed = res.smoothed_marginal_probabilities if hasattr(smoothed, "values"): smoothed = smoothed.values # choose most probable regime per time # smoothed shape is usually (T, K) state = np.argmax(smoothed, axis=1) probs = smoothed # shape (T, K) except Exception as ex: return {"error": f"MS-AR fitting error: {ex}"} payload = { "success": True, "symbol": symbol, "timeframe": timeframe, "method": method, "target": target, "times": t_fmt, "state": [int(s) for s in state.tolist()], "state_probabilities": [[float(v) for v in row] for row in probs.tolist()], "params_used": {"k_regimes": k_regimes, "order": order}, } if output in ('summary','compact'): n = min(int(lookback), len(state)) st_tail = state[-n:] if n > 0 else state last_s = int(state[-1]) if len(state) else None # Shares unique, counts = np.unique(st_tail, return_counts=True) shares = {int(k): float(c) / float(len(st_tail) or 1) for k, c in zip(unique, counts)} summary = {"lookback": int(n), "last_state": last_s, "state_shares": shares} if output == 'summary': payload = {"success": True, "symbol": symbol, "timeframe": timeframe, "method": method, "summary": summary} else: payload["summary"] = summary if n > 0: payload["state"] = [int(s) for s in st_tail.tolist()] payload["times"] = t_fmt[-n:] payload["state_probabilities"] = [[float(v) for v in row] for row in probs.tolist()][-n:] return _attach_toon_rows(payload, method) else: # 'hmm' (mixture/HMM-lite) try: from ..forecast.monte_carlo import fit_gaussian_mixture_1d except Exception as ex: return {"error": f"HMM-lite import error: {ex}"} n_states = int(p.get('n_states', 2)) w, mu, sigma, gamma, _ = fit_gaussian_mixture_1d(x, n_states=max(2, n_states)) state = np.argmax(gamma, axis=1) if gamma.ndim == 2 and gamma.shape[0] == x.size else np.zeros(x.size, dtype=int) payload = { "success": True, "symbol": symbol, "timeframe": timeframe, "method": method, "target": target, "times": t_fmt, "state": [int(s) for s in state.tolist()], "state_probabilities": [[float(v) for v in row] for row in (gamma.tolist() if gamma.ndim == 2 else np.zeros((x.size, n_states)).tolist())], "regime_params": {"weights": [float(v) for v in w.tolist()], "mu": [float(v) for v in mu.tolist()], "sigma": [float(v) for v in sigma.tolist()]}, } if output in ('summary','compact'): n = min(int(lookback), len(state)) st_tail = state[-n:] if n > 0 else state last_s = int(state[-1]) if len(state) else None unique, counts = np.unique(st_tail, return_counts=True) shares = {int(k): float(c) / float(len(st_tail) or 1) for k, c in zip(unique, counts)} # Order states by sigma ascending for interpretability order = np.argsort(sigma) ranks = {int(s): int(r) for r, s in enumerate(order)} summary = { "lookback": int(n), "last_state": last_s, "state_shares": shares, "state_sigma": {int(i): float(sigma[i]) for i in range(len(sigma))}, "state_order_by_sigma": ranks, } if output == 'summary': payload = {"success": True, "symbol": symbol, "timeframe": timeframe, "method": method, "summary": summary} else: payload["summary"] = summary if n > 0: payload["state"] = [int(s) for s in st_tail.tolist()] payload["times"] = t_fmt[-n:] # If gamma present, truncate probabilities too if isinstance(gamma, np.ndarray) and gamma.ndim == 2 and gamma.shape[0] >= n: payload["state_probabilities"] = [[float(v) for v in row] for row in gamma[-n:].tolist()] return _attach_toon_rows(payload, method) except Exception as e: return {"error": f"Error detecting regimes: {str(e)}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/emerzon/mt-data-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server