Skip to main content
Glama
emerzon

MetaTrader5 MCP Server

by emerzon
simplify.py23 kB
""" Simplification helpers extracted for reuse across server tools. Contains target-point selection utilities and core selection algorithms. """ from typing import Any, Dict, List, Optional, Tuple, Callable import math import pandas as pd import numpy as np # Local defaults to avoid circular import with core package during initialization. # Keep in sync with src/mtdata/core/constants.py SIMPLIFY_DEFAULT_RATIO = 0.25 SIMPLIFY_DEFAULT_MIN_POINTS = 100 SIMPLIFY_DEFAULT_MAX_POINTS = 500 def _default_target_points(total: int) -> int: """Default target points when simplify spec lacks explicit size. Uses SIMPLIFY_DEFAULT_RATIO bounded by [SIMPLIFY_DEFAULT_MIN_POINTS, SIMPLIFY_DEFAULT_MAX_POINTS]. """ try: t = int(round(total * SIMPLIFY_DEFAULT_RATIO)) t = max(SIMPLIFY_DEFAULT_MIN_POINTS, min(SIMPLIFY_DEFAULT_MAX_POINTS, t)) return max(3, min(t, total)) except Exception: return max(3, min(SIMPLIFY_DEFAULT_MIN_POINTS, total)) def _choose_simplify_points(total: int, spec: Dict[str, Any]) -> int: """Determine target number of points from a simplify spec. Supports keys: 'points', 'max_points', 'target_points', or 'ratio' (0..1). Enforces bounds [3, total]. Returns total if no effective reduction requested. """ try: if not spec: return total n = None for k in ("points", "max_points", "target_points"): if k in spec and spec[k] is not None: try: n = int(spec[k]) break except Exception: pass if n is None and "ratio" in spec and spec["ratio"] is not None: try: r = float(spec["ratio"]) if 0 < r < 1: n = int(max(3, round(total * r))) except Exception: pass if n is None: if spec and ("method" in spec or len(spec) > 0): return _default_target_points(total) return total n = max(3, min(int(n), total)) return n except Exception: return total # ---- Core selection algorithms ---- def _lttb_select_indices(x: List[float], y: List[float], n_out: int) -> List[int]: """Largest-Triangle-Three-Buckets (LTTB) downsampling. Returns the indices of selected points (monotonic, includes first/last). """ try: m = len(x) if n_out >= m: return list(range(m)) if n_out <= 2 or m <= 2: return [0, max(0, m - 1)] idxs: List[int] = [0] bucket_size = (m - 2) / float(n_out - 2) a = 0 for i in range(0, n_out - 2): start = int(math.floor(1 + i * bucket_size)) end = int(math.floor(1 + (i + 1) * bucket_size)) end = min(end, m - 1) if start >= end: start = max(min(start, m - 2), 1) end = start + 1 next_start = int(math.floor(1 + (i + 1) * bucket_size)) next_end = int(math.floor(1 + (i + 2) * bucket_size)) next_end = min(next_end, m - 1) if next_start >= next_end: next_start = max(min(next_start, m - 1), 1) next_end = next_start avg_x = 0.0 avg_y = 0.0 cnt = max(1, next_end - next_start) for j in range(next_start, next_end): avg_x += x[j] avg_y += y[j] avg_x /= float(cnt) avg_y /= float(cnt) ax = x[a] ay = y[a] max_area = -1.0 max_idx = start for j in range(start, end): area = abs((ax - avg_x) * (y[j] - ay) - (ax - x[j]) * (avg_y - ay)) if area > max_area: max_area = area max_idx = j idxs.append(max_idx) a = max_idx idxs.append(m - 1) out = [] last = -1 for ix in sorted(set(idxs)): if ix > last: out.append(ix) last = ix return out except Exception: return list(range(len(x))) def _point_line_distance(px: float, py: float, x1: float, y1: float, x2: float, y2: float) -> float: """Vertical distance from P to line y(x) through (x1,y1)-(x2,y2).""" try: dx = x2 - x1 if dx == 0.0: return abs(py - (y1 + y2) / 2.0) m = (y2 - y1) / dx y_on_line = y1 + m * (px - x1) return abs(py - y_on_line) except Exception: return 0.0 def _uniform_segment_indices(n: int, segments: int) -> List[int]: """Return indices that split a sequence of length n into `segments` segments.""" if n <= 2 or segments <= 1: return list(range(n)) segments = max(1, min(int(segments), n - 1)) idxs = [0] for k in range(1, segments): idxs.append(int(round(k * (n - 1) / segments))) idxs.append(n - 1) return sorted(set(idxs)) def _line_deviation_hi(x: List[float], y: List[float]) -> float: """Compute a robust upper bound for line-based deviation in [x,y].""" n = len(x) if n == 0: return 1.0 x0, y0 = x[0], y[0] x1, y1 = x[-1], y[-1] dx = x1 - x0 if dx == 0: base = sum(y) / float(n) return max(abs(v - base) for v in y) if n > 0 else 1.0 m = (y1 - y0) / dx hi = 0.0 for i in range(n): yline = y0 + m * (x[i] - x0) hi = max(hi, abs(y[i] - yline)) if hi <= 0: rng = (max(y) - min(y)) if n > 1 else 1.0 hi = max(1e-12, rng) return float(hi) def _abs_deviation_hi(y: List[float]) -> float: """Upper bound for absolute deviation from the mean for APCA.""" n = len(y) if n == 0: return 1.0 base = sum(y) / float(n) hi = max(abs(v - base) for v in y) if n > 0 else 1.0 if hi <= 0: rng = (max(y) - min(y)) if n > 1 else 1.0 hi = max(1e-12, rng) return float(hi) def _binary_search_param( target_points: int, lo: float, hi: float, iter_limit: int, compute_indices: Callable[[float], List[int]], ) -> Tuple[List[int], float]: """Generic binary search to find a parameter so that indices length ~ target_points. Returns (best_indices, best_param). On ties, prefers smaller parameter value. """ target = max(3, int(target_points)) best_idxs = compute_indices(lo) best_p = lo best_diff = abs(len(best_idxs) - target) for _ in range(max(1, int(iter_limit))): mid = (lo + hi) / 2.0 idxs = compute_indices(mid) cnt = len(idxs) diff = abs(cnt - target) if diff < best_diff or (diff == best_diff and mid < best_p): best_idxs = idxs best_p = mid best_diff = diff if cnt > target: lo = mid if mid > lo else lo + (hi - lo) * 0.5 elif cnt < target: hi = mid else: break if hi - lo <= 1e-12: break return best_idxs, float(best_p) def _rdp_select_indices(x: List[float], y: List[float], epsilon: float) -> List[int]: """Douglas–Peucker simplification returning kept indices.""" try: n = len(x) if n <= 2 or epsilon <= 0: return list(range(n)) stack: List[Tuple[int, int]] = [(0, n - 1)] keep = [False] * n keep[0] = True keep[n - 1] = True while stack: i0, i1 = stack.pop() x0, y0 = x[i0], y[i0] x1, y1 = x[i1], y[i1] idx = -1 dmax = 0.0 for i in range(i0 + 1, i1): d = _point_line_distance(x[i], y[i], x0, y0, x1, y1) if d > dmax: idx = i dmax = d if idx != -1 and dmax > epsilon: keep[idx] = True stack.append((i0, idx)) stack.append((idx, i1)) return [i for i, k in enumerate(keep) if k] except Exception: return list(range(len(x))) def _max_line_error(x: List[float], y: List[float], i0: int, i1: int) -> float: if i1 <= i0 + 1: return 0.0 x0, y0 = x[i0], y[i0] x1, y1 = x[i1], y[i1] m = 0.0 for i in range(i0 + 1, i1): d = _point_line_distance(x[i], y[i], x0, y0, x1, y1) if d > m: m = d return m def _finalize_indices(n: int, idxs: List[int]) -> List[int]: """Ensure last index included and indices are strictly increasing unique.""" if not idxs: return list(range(n)) if idxs[-1] != n - 1: idxs.append(n - 1) out: List[int] = [] for i in idxs: ii = int(i) if not out or ii > out[-1]: out.append(ii) return out def _greedy_segment_indices(n: int, check_ok: Callable[[int, int], bool]) -> List[int]: """Generic greedy grow-and-split segment index selector using a monotone check. Expands [start:end] while check_ok(start,end) holds, then commits the last good end. """ idxs: List[int] = [0] start = 0 while start < n - 1: end = start + 1 last_good = end while end < n: if check_ok(start, end): last_good = end end += 1 else: break if last_good == start: last_good = start + 1 idxs.append(last_good) start = last_good return idxs def _pla_select_indices( x: List[float], y: List[float], max_error: Optional[float] = None, segments: Optional[int] = None, points: Optional[int] = None, ) -> List[int]: """Piecewise Linear Approximation; greedy by max perpendicular error or uniform segments.""" n = len(x) if n <= 2: return list(range(n)) if (max_error is None or max_error <= 0) and (segments or points): if points and (segments is None): segments = max(1, int(points) - 1) return _uniform_segment_indices(n, int(segments or 1)) if max_error is None or max_error <= 0: return list(range(n)) idxs = _greedy_segment_indices(n, lambda s, e: _max_line_error(x, y, s, e) <= float(max_error)) return _finalize_indices(n, idxs) def _apca_select_indices( y: List[float], max_error: Optional[float] = None, segments: Optional[int] = None, points: Optional[int] = None, ) -> List[int]: """Adaptive Piecewise Constant Approximation; greedy by max absolute deviation or uniform.""" n = len(y) if n <= 2: return list(range(n)) if (max_error is None or max_error <= 0) and (segments or points): if points and (segments is None): segments = max(1, int(points) - 1) return _uniform_segment_indices(n, int(segments or 1)) if max_error is None or max_error <= 0: return list(range(n)) def _ok(s: int, e: int) -> bool: seg = y[s:e + 1] mean = sum(seg) / float(len(seg)) err = max(abs(v - mean) for v in seg) return err <= float(max_error) idxs = _greedy_segment_indices(n, _ok) return _finalize_indices(n, idxs) def _rdp_autotune_epsilon(x: List[float], y: List[float], target_points: int, max_iter: int = 24) -> Tuple[List[int], float]: """Binary search epsilon so RDP keeps ~target_points. Returns (indices, epsilon).""" n = len(x) target = max(3, min(int(target_points), n)) if target >= n: return list(range(n)), 0.0 lo = 0.0 hi = _line_deviation_hi(x, y) return _binary_search_param(target, lo, hi, max_iter, lambda mid: _rdp_select_indices(x, y, mid)) def _pla_autotune_max_error(x: List[float], y: List[float], target_points: int, max_iter: int = 24) -> Tuple[List[int], float]: """Binary search max_error so PLA keeps ~target_points. Returns (indices, max_error).""" n = len(x) target = max(3, min(int(target_points), n)) if target >= n: return list(range(n)), 0.0 lo = 0.0 hi = _line_deviation_hi(x, y) return _binary_search_param(target, lo, hi, max_iter, lambda mid: _pla_select_indices(x, y, mid, None, None)) def _apca_autotune_max_error(y: List[float], target_points: int, max_iter: int = 24) -> Tuple[List[int], float]: """Binary search max_error so APCA keeps ~target_points. Returns (indices, max_error).""" n = len(y) target = max(3, min(int(target_points), n)) if target >= n: return list(range(n)), 0.0 lo = 0.0 hi = _abs_deviation_hi(y) return _binary_search_param(target, lo, hi, max_iter, lambda mid: _apca_select_indices(y, mid, None, None)) def _select_indices_for_timeseries(x: List[float], y: List[float], spec: Optional[Dict[str, Any]]) -> Tuple[List[int], str, Dict[str, Any]]: """Select representative indices according to simplify spec. Returns (indices, method_used, params_meta). """ meta: Dict[str, Any] = {} if not spec: return list(range(len(x))), "none", meta method = str(spec.get("method", "lttb")).lower().strip() if method in ("lttb", "default"): n_out = _choose_simplify_points(len(x), spec) idxs = _lttb_select_indices(x, y, n_out) meta.update({"points": n_out}) return idxs, "lttb", meta if method == "rdp": eps = spec.get("epsilon", None) or spec.get("tolerance", None) or spec.get("eps", None) try: eps_val = float(eps) if eps is not None else None except Exception: eps_val = None if eps_val is not None and eps_val > 0: idxs = _rdp_select_indices(x, y, eps_val) meta.update({"epsilon": eps_val}) return idxs, "rdp", meta target = spec.get("points") or spec.get("target_points") or spec.get("max_points") or None if target is None and spec.get("ratio") is not None: try: r = float(spec.get("ratio")) if 0 < r < 1: target = max(3, int(round(len(x) * r))) except Exception: pass if target is None: target = _default_target_points(len(x)) try: target_n = int(target) if target is not None else None except Exception: target_n = None if target_n is not None and target_n < len(x): idxs, eps_used = _rdp_autotune_epsilon(x, y, target_n) meta.update({"epsilon": eps_used, "points": len(idxs), "auto_tuned": True}) return idxs, "rdp", meta n_out = _choose_simplify_points(len(x), spec) idxs = _lttb_select_indices(x, y, n_out) meta.update({"points": n_out, "fallback": "rdp->lttb"}) return idxs, "lttb", meta if method == "pla": max_error = spec.get("max_error", None) try: me = float(max_error) if max_error is not None else None except Exception: me = None segments = spec.get("segments", None) points = spec.get("points", None) or spec.get("target_points", None) or spec.get("max_points", None) if me is not None and me > 0: idxs = _pla_select_indices(x, y, me, None, None) meta.update({"max_error": me}) return idxs, "pla", meta if segments is not None: idxs = _pla_select_indices(x, y, None, segments, None) meta.update({"segments": segments}) return idxs, "pla", meta try: p = int(points) if points is not None else None except Exception: p = None if p is None: p = _default_target_points(len(x)) if p is not None and p < len(x): idxs, me_used = _pla_autotune_max_error(x, y, p) meta.update({"max_error": me_used, "points": len(idxs), "auto_tuned": True}) return idxs, "pla", meta n_out = _choose_simplify_points(len(x), spec) idxs = _lttb_select_indices(x, y, n_out) meta.update({"points": n_out, "fallback": "pla->lttb"}) return idxs, "lttb", meta if method == "apca": max_error = spec.get("max_error", None) try: me = float(max_error) if max_error is not None else None except Exception: me = None segments = spec.get("segments", None) points = spec.get("points", None) or spec.get("target_points", None) or spec.get("max_points", None) if me is not None and me > 0: idxs = _apca_select_indices(y, me, None, None) meta.update({"max_error": me}) return idxs, "apca", meta if segments is not None: idxs = _apca_select_indices(y, None, segments, None) meta.update({"segments": segments}) return idxs, "apca", meta try: p = int(points) if points is not None else None except Exception: p = None if p is None: p = _default_target_points(len(x)) if p is not None and p < len(x): idxs, me_used = _apca_autotune_max_error(y, p) meta.update({"max_error": me_used, "points": len(idxs), "auto_tuned": True}) return idxs, "apca", meta n_out = _choose_simplify_points(len(x), spec) idxs = _lttb_select_indices(x, y, n_out) meta.update({"points": n_out, "fallback": "apca->lttb"}) return idxs, "lttb", meta n_out = _choose_simplify_points(len(x), spec) idxs = _lttb_select_indices(x, y, n_out) meta.update({"points": n_out, "fallback": f"{method}->lttb"}) return idxs, "lttb", meta def _simplify_dataframe_rows(df: pd.DataFrame, headers: List[str], simplify: Optional[Dict[str, Any]]) -> Tuple[pd.DataFrame, Optional[Dict[str, Any]]]: """Reduce or transform rows across numeric columns. Modes (simplify['mode']): - 'select' (default): pick representative existing rows using the chosen method. - 'approximate': partition by selected breakpoints and aggregate numeric columns per segment. - 'resample': time-based bucketing by '__epoch' with 'bucket_seconds'; aggregates numeric columns. - 'encode': transform per-row representation to a compact schema (e.g., envelope or delta) and optionally pre-select rows before encoding. Aggregation: mean for numeric columns; first value for non-numeric columns like 'time'. """ if not simplify: return df, None try: total = len(df) if total <= 3: return df, None # Import constants to avoid circular imports from ..core.constants import SIMPLIFY_DEFAULT_METHOD, SIMPLIFY_DEFAULT_MODE method = str(simplify.get("method", SIMPLIFY_DEFAULT_METHOD)).lower().strip() mode = str(simplify.get("mode", SIMPLIFY_DEFAULT_MODE)).lower().strip() or SIMPLIFY_DEFAULT_MODE # If users passed a high-level mode via --simplify (CLI maps to 'method'), map it to mode if method in ("encode", "symbolic", "segment"): explicit_mode = str(simplify.get("mode", "")).lower().strip() if explicit_mode in ("", SIMPLIFY_DEFAULT_MODE, "select"): mode = method # Helper: numeric columns in requested headers order, then any others def _numeric_columns_from_headers() -> List[str]: cols: List[str] = [] for h in headers: if h in ('time',) or str(h).startswith('_'): continue try: if h in df.columns and pd.api.types.is_numeric_dtype(df[h]): cols.append(h) except Exception: continue if not cols: for c in df.columns: if c in ('time',) or str(c).startswith('_'): continue try: if pd.api.types.is_numeric_dtype(df[c]): cols.append(c) except Exception: continue return cols # Aggregation helper for approximate/resample modes def _aggregate_segment(i0: int, i1: int) -> Dict[str, Any]: seg = df.iloc[i0:i1] row: Dict[str, Any] = {} if "time" in df.columns and "time" in headers: row["time"] = seg.iloc[0]["time"] for col in headers: if col == "time": continue if col in seg.columns and pd.api.types.is_numeric_dtype(seg[col]): row[col] = float(seg[col].mean()) elif col in seg.columns: try: row[col] = next((v for v in seg[col].tolist() if pd.notna(v)), seg.iloc[0][col]) except Exception: row[col] = seg.iloc[0][col] return row # Determine target number of points early (used for select and to infer resample/encode) n_out = _choose_simplify_points(total, simplify) if mode == "resample" and "__epoch" in df.columns: bs = simplify.get("bucket_seconds") if bs is None or (isinstance(bs, (int, float)) and bs <= 0): try: # Infer bucket by total time span / target buckets t0 = float(df["__epoch"].iloc[0]) t1 = float(df["__epoch"].iloc[-1]) span = max(1.0, t1 - t0) bs = max(1, int(round(span / max(1, n_out)))) except Exception: # Fallback to rough bucket from count bs = max(1, int(round(total / float(max(1, n_out))))) try: bs = max(1, int(bs)) except Exception: bs = max(1, int(round(total / float(max(1, n_out))))) grp = ((df["__epoch"].astype(float) - float(df["__epoch"].iloc[0])) // bs).astype(int) out_rows: List[Dict[str, Any]] = [] for _, seg in df.groupby(grp): i0 = seg.index[0] i1 = seg.index[-1] + 1 out_rows.append(_aggregate_segment(i0, i1)) out_df = pd.DataFrame(out_rows) meta = { "mode": "resample", "method": method or SIMPLIFY_DEFAULT_METHOD, "bucket_seconds": int(bs), "original_rows": total, "returned_rows": len(out_df), "points": len(out_df), } return out_df.reset_index(drop=True), meta # For other modes, delegate to services/simplification for now # This maintains compatibility while allowing future consolidation from ..services.simplification import _simplify_dataframe_rows_ext as _simplify_ext return _simplify_ext(df, headers, simplify) except Exception: return df, None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/emerzon/mt-data-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server