Skip to main content
Glama
emerzon

MetaTrader5 MCP Server

by emerzon
patterns.py31.1 kB
from __future__ import annotations from dataclasses import dataclass from typing import Dict, List, Optional, Tuple, Any import numpy as np import pandas as pd from scipy.spatial.ckdtree import cKDTree try: import hnswlib as _HNSW # type: ignore except Exception: _HNSW = None # optional ANN backend # Optional matrix profile backend try: import stumpy as _stumpy # type: ignore except Exception: _stumpy = None # Optional DTW/Soft-DTW backends try: from tslearn.metrics import dtw as _ts_dtw # type: ignore except Exception: _ts_dtw = None try: from tslearn.metrics import soft_dtw as _ts_soft_dtw # type: ignore except Exception: _ts_soft_dtw = None try: from dtaidistance import dtw as _dd_dtw # type: ignore except Exception: _dd_dtw = None # Dimensionality reduction abstraction from .dimred import create_reducer as _create_reducer, DimReducer as _DimReducer # Reuse existing MT5 helpers and denoise utilities from ..core.constants import TIMEFRAME_MAP, TIMEFRAME_SECONDS from .mt5 import _mt5_copy_rates_from, _mt5_epoch_to_utc from .denoise import _apply_denoise as _apply_denoise_util from .utils import to_float_np, align_finite def _minmax_scale_row(x: np.ndarray) -> np.ndarray: x = np.asarray(x, dtype=float) if x.size == 0: return x mn = np.nanmin(x) mx = np.nanmax(x) rng = float(mx - mn) if not np.isfinite(rng) or rng <= 1e-12: return np.zeros_like(x, dtype=np.float32) y = (x - mn) / rng return y.astype(np.float32, copy=False) def _mass_distance_profile(query: np.ndarray, series: np.ndarray, *, eps: float = 1e-12) -> np.ndarray: """Compute z-normalized sliding distances between query and all subsequences in series using FFT. Returns an array of length len(series) - len(query) + 1. Non-finite windows are mapped to inf. """ q = np.asarray(query, dtype=float).ravel() s = np.asarray(series, dtype=float).ravel() m = q.size n = s.size if m == 0 or n < m: return np.array([], dtype=float) q_std = float(np.nanstd(q)) if not np.isfinite(q_std) or q_std <= eps: return np.full(max(n - m + 1, 0), np.inf, dtype=float) q_norm = (q - float(np.nanmean(q))) / q_std # Fast convolution via FFT to get dot products k = int(1 << (n + m - 1).bit_length()) # next power of two for speed q_rev = np.flip(q_norm) q_rev = np.pad(q_rev, (0, k - m)) s_pad = np.pad(s, (0, k - n)) fft_q = np.fft.fft(q_rev) fft_s = np.fft.fft(s_pad) conv = np.fft.ifft(fft_q * fft_s).real cross = conv[m - 1 : m - 1 + (n - m + 1)] # Rolling mean/std for series windows cumsum = np.cumsum(s, dtype=float) cumsum2 = np.cumsum(s * s, dtype=float) window_sum = cumsum[m - 1 :] - np.concatenate(([0.0], cumsum[:-m])) window_sum2 = cumsum2[m - 1 :] - np.concatenate(([0.0], cumsum2[:-m])) means = window_sum / m stds = np.sqrt(np.maximum(window_sum2 / m - means * means, 0.0)) stds[stds <= eps] = np.nan denom = m * stds with np.errstate(invalid="ignore", divide="ignore"): dist2 = 2.0 * m * (1.0 - (cross / denom)) dist2[~np.isfinite(dist2)] = np.inf dist2 = np.maximum(dist2, 0.0) return np.sqrt(dist2) @dataclass class _SeriesStore: symbol: str time_epoch: np.ndarray # float64 UTC epoch seconds, ascending close: np.ndarray # float64, ascending class PatternIndex: """In-memory sliding-window index for pattern similarity search. - Builds per-window min-max normalized vectors of length `window_size` from MT5 close prices. - Uses cKDTree for fast L2 nearest neighbor search. - Keeps mappings to reconstruct symbol, dates, and original values (+future). """ def __init__( self, timeframe: str, window_size: int, future_size: int, symbols: List[str], tree: Any, X: np.ndarray, start_end_idx: np.ndarray, labels: np.ndarray, series: List[_SeriesStore], scale: str = "minmax", metric: str = "euclidean", pca_components: Optional[int] = None, pca_model: Optional[object] = None, dimred_method: Optional[str] = None, dimred_params: Optional[Dict[str, Any]] = None, reducer: Optional[_DimReducer] = None, engine: str = "ckdtree", max_bars_per_symbol: int = 5000, ): self.timeframe = timeframe self.window_size = int(window_size) self.future_size = int(future_size) self.symbols = list(symbols) self.tree = tree self.X = X self.start_end_idx = start_end_idx # shape (N,2) self.labels = labels # shape (N,) self._series = series # list aligned with label indices self.scale = (scale or "minmax").lower() self.metric = (metric or "euclidean").lower() # Back-compat: keep PCA fields, while new reducer API is used going forward self.pca_components = int(pca_components) if pca_components else None self._pca = pca_model self.dimred_method = (dimred_method or ("pca" if self.pca_components else "none")).lower() self.dimred_params = dict(dimred_params or ({} if not self.pca_components else {"n_components": int(self.pca_components)})) self._reducer = reducer # type: ignore self.engine = (engine or "ckdtree").lower() self.max_bars_per_symbol = int(max_bars_per_symbol) def search(self, anchor_values: np.ndarray, top_k: int = 5) -> Tuple[np.ndarray, np.ndarray]: """Query by a raw (unscaled) anchor window. Returns (indices, distances).""" v = np.asarray(anchor_values, dtype=float).ravel() if v.size != self.window_size: raise ValueError(f"anchor_values must be length {self.window_size}") if self.engine in ("matrix_profile", "mass"): return self._profile_search(v, top_k=top_k) q = v.astype(float) # Scale q = _apply_scale_vector(q, self.scale) # Dimensionality reduction (new API), falling back to PCA model if present if self._reducer is not None: if not self._reducer.supports_transform(): raise RuntimeError(f"Reducer '{self.dimred_method}' does not support transforming new samples") q = np.asarray(self._reducer.transform(q.reshape(1, -1)), dtype=np.float32).ravel() elif self._pca is not None: q = np.asarray(self._pca.transform(q.reshape(1, -1))[0], dtype=np.float32) # Metric post-process q = _apply_metric_vector(q, self.metric) k = min(int(top_k), len(self.X)) if self.engine == "hnsw": # hnswlib with 'l2' space returns squared L2 distances; take sqrt to match cKDTree labels, distances = self.tree.knn_query(q.reshape(1, -1).astype(np.float32), k=k) idxs = labels[0].astype(int) dists = np.sqrt(distances[0].astype(float)) return idxs, dists else: dists, idxs = self.tree.query(q, k=k) # Ensure 1D arrays if np.ndim(idxs) == 0: idxs = np.asarray([int(idxs)]) dists = np.asarray([float(dists)]) return idxs.astype(int), dists.astype(float) def _profile_search(self, anchor_values: np.ndarray, top_k: int) -> Tuple[np.ndarray, np.ndarray]: """Sliding search using matrix profile / MASS style distances.""" if self.scale not in ("zscore",): raise ValueError("matrix_profile/mass engines require scale='zscore'") if self.metric not in ("euclidean", "l2"): raise ValueError("matrix_profile/mass engines require metric='euclidean'") q = np.asarray(anchor_values, dtype=float).ravel() m = q.size idxs_all: List[int] = [] dists_all: List[float] = [] offset = 0 for ser in self._series: n = ser.close.size limit = n - (self.window_size + self.future_size) + 1 if limit <= 0: continue series_slice = ser.close[: limit + self.window_size + max(self.future_size - 1, 0)] if series_slice.size < m: offset += max(limit, 0) continue if self.engine == "matrix_profile": if _stumpy is None: raise RuntimeError("matrix_profile engine requested but 'stumpy' is not installed") # AB-join: distances from every subsequence in series_slice to the query subsequence mp = _stumpy.stump(series_slice.astype(float), m, T_B=q.astype(float), ignore_trivial=False) profile = np.asarray(mp[:, 0], dtype=float) else: profile = _mass_distance_profile(q, series_slice) if profile.size > limit: profile = profile[:limit] for i, d in enumerate(profile.tolist()): idxs_all.append(offset + i) dists_all.append(float(d)) offset += max(limit, 0) if not idxs_all: return np.array([], dtype=int), np.array([], dtype=float) order = np.argsort(np.asarray(dists_all, dtype=float)) k = min(int(top_k), order.size) sel = order[:k] return np.asarray([idxs_all[i] for i in sel], dtype=int), np.asarray([dists_all[i] for i in sel], dtype=float) def get_match_symbol(self, index: int) -> str: lbl = int(self.labels[int(index)]) return self._series[lbl].symbol def get_match_times(self, index: int, include_future: bool = True) -> np.ndarray: s, e = self.start_end_idx[int(index)] ser = self._series[int(self.labels[int(index)])] if include_future and self.future_size > 0: end = min(int(e + self.future_size), len(ser.time_epoch) - 1) start = max(0, int(s)) else: start, end = int(s), int(e) return ser.time_epoch[start : end + 1] def get_match_values(self, index: int, include_future: bool = True) -> np.ndarray: s, e = self.start_end_idx[int(index)] ser = self._series[int(self.labels[int(index)])] if include_future and self.future_size > 0: end = min(int(e + self.future_size), len(ser.close) - 1) start = max(0, int(s)) else: start, end = int(s), int(e) return ser.close[start : end + 1] def _scaled_window(self, vals: np.ndarray) -> np.ndarray: # Apply the same scaling as index vectors for fair comparison return _apply_scale_vector(np.asarray(vals, dtype=float), self.scale) def _ncc_max(self, a: np.ndarray, b: np.ndarray, max_lag: int) -> float: """Compute maximum normalized cross-correlation within +/- max_lag. a and b are same-length 1D arrays (window only). """ a = np.asarray(a, dtype=float).ravel() b = np.asarray(b, dtype=float).ravel() n = int(min(a.size, b.size)) if n <= 2: return 0.0 # Z-normalize for correlation def znorm(x: np.ndarray) -> np.ndarray: xm = float(np.nanmean(x)) xs = float(np.nanstd(x)) if not np.isfinite(xs) or xs <= 1e-12: return np.zeros_like(x, dtype=float) return (x - xm) / xs a = znorm(a) b = znorm(b) L = int(max(0, max_lag)) best = -1.0 for lag in range(-L, L + 1): if lag == 0: aa, bb = a, b elif lag > 0: aa = a[lag:] bb = b[: n - lag] else: # lag < 0 aa = a[: n + lag] bb = b[-lag:] m = int(min(aa.size, bb.size)) if m <= 2: continue # Cosine similarity == correlation since both z-normalized num = float(np.dot(aa, bb)) den = float(np.linalg.norm(aa) * np.linalg.norm(bb)) if not np.isfinite(den) or den <= 1e-12: corr = 0.0 else: corr = num / den if corr > best: best = corr if not np.isfinite(best): best = 0.0 return float(max(min(best, 1.0), -1.0)) def refine_matches( self, anchor_values: np.ndarray, idxs: np.ndarray, dists: np.ndarray, top_k: int, shape_metric: Optional[str] = None, allow_lag: int = 0, dtw_band_frac: Optional[float] = None, soft_dtw_gamma: Optional[float] = None, affine_alpha_min: float = 0.5, affine_alpha_max: float = 2.0, affine_penalty: float = 0.0, ) -> Tuple[np.ndarray, np.ndarray]: """Re-rank candidates using a shape metric (e.g., NCC with lag) and return top_k. shape_metric: - 'ncc': normalized cross-correlation with +/- allow_lag bar shifts - None/'none': no re-ranking """ if shape_metric is None: shape_metric = 'none' sm = str(shape_metric).lower().strip() if sm in ("", "none"): # No refinement; just truncate k = min(int(top_k), idxs.size) return idxs[:k], dists[:k] # Prepare scaled anchor window (window part only) a = self._scaled_window(np.asarray(anchor_values, dtype=float)) scores: List[Tuple[float, int]] = [] max_lag = int(allow_lag) if allow_lag and int(allow_lag) > 0 else 0 for idx, _d in zip(idxs.tolist(), dists.tolist()): w = self.get_match_values(int(idx), include_future=False) w = self._scaled_window(w) if sm == 'ncc': corr = self._ncc_max(a, w, max_lag) # Convert to distance-like score: lower is better score = 1.0 - float(corr) elif sm == 'affine': # Fit alpha, beta minimizing ||a - (alpha*w + beta)||_2 aw = float(np.dot(a - np.mean(a), w - np.mean(w))) ww = float(np.dot(w - np.mean(w), w - np.mean(w))) alpha = (aw / ww) if (np.isfinite(ww) and ww > 1e-12) else 0.0 # Constrain alpha alpha = max(float(affine_alpha_min), min(float(affine_alpha_max), float(alpha))) beta = float(np.mean(a) - alpha * np.mean(w)) resid = a - (alpha * w + beta) rmse = float(np.sqrt(np.mean(resid * resid))) # Optional penalty to discourage extreme scaling score = rmse + float(affine_penalty) * abs(float(alpha) - 1.0) elif sm in ('dtw', 'softdtw'): # Compute DTW/Soft-DTW distance; fallback to simple DP if libs unavailable n = a.size band = None if dtw_band_frac is not None and dtw_band_frac > 0: band = max(1, int(round(float(dtw_band_frac) * n))) if sm == 'dtw': dist = None if _ts_dtw is not None: try: if band: dist = float(_ts_dtw(a, w, global_constraint="sakoe_chiba", sakoe_chiba_radius=int(band))) else: dist = float(_ts_dtw(a, w)) except Exception: dist = None if dist is None and _dd_dtw is not None: try: if band: dist = float(_dd_dtw.distance_fast(a, w, window=band)) else: dist = float(_dd_dtw.distance_fast(a, w)) except Exception: dist = None if dist is None: # Simple O(n^2) DTW DP fallback (no band) ca = np.full((n + 1, n + 1), np.inf, dtype=float) ca[0, 0] = 0.0 for i in range(1, n + 1): for j in range(1, n + 1): cost = abs(a[i - 1] - w[j - 1]) ca[i, j] = cost + min(ca[i - 1, j], ca[i, j - 1], ca[i - 1, j - 1]) dist = float(ca[n, n]) score = dist else: # softdtw dist = None if _ts_soft_dtw is not None: try: gamma = float(soft_dtw_gamma) if (soft_dtw_gamma is not None and soft_dtw_gamma > 0) else 1.0 dist = float(_ts_soft_dtw(a.reshape(1, -1), w.reshape(1, -1), gamma=gamma)) except Exception: dist = None if dist is None: # Fallback to DTW if soft-DTW unavailable if _ts_dtw is not None: try: if band: dist = float(_ts_dtw(a, w, global_constraint="sakoe_chiba", sakoe_chiba_radius=int(band))) else: dist = float(_ts_dtw(a, w)) except Exception: dist = None if dist is None and _dd_dtw is not None: try: if band: dist = float(_dd_dtw.distance_fast(a, w, window=band)) else: dist = float(_dd_dtw.distance_fast(a, w)) except Exception: dist = None if dist is None: # Final fallback to simple DTW ca = np.full((n + 1, n + 1), np.inf, dtype=float) ca[0, 0] = 0.0 for i in range(1, n + 1): for j in range(1, n + 1): cost = abs(a[i - 1] - w[j - 1]) ca[i, j] = cost + min(ca[i - 1, j], ca[i, j - 1], ca[i - 1, j - 1]) dist = float(ca[n, n]) score = dist else: # Fallback to euclidean on scaled windows diff = a - w score = float(np.sqrt(np.dot(diff, diff))) scores.append((score, int(idx))) scores.sort(key=lambda x: x[0]) take = min(int(top_k), len(scores)) new_idxs = np.array([i for _, i in scores[:take]], dtype=int) new_scores = np.array([s for s, _ in scores[:take]], dtype=float) return new_idxs, new_scores def bars_per_symbol(self) -> Dict[str, int]: out: Dict[str, int] = {} for ser in self._series: out[ser.symbol] = int(len(ser.close)) return out def windows_per_symbol(self) -> Dict[str, int]: out: Dict[str, int] = {} for ser in self._series: n = int(len(ser.close)) w = max(0, n - (self.window_size + self.future_size) + 1) out[ser.symbol] = w return out def get_symbol_series(self, symbol: str) -> Optional[np.ndarray]: for ser in self._series: if ser.symbol == symbol: return ser.close return None def get_symbol_returns(self, symbol: str, lookback: int = 1000) -> Optional[np.ndarray]: arr = self.get_symbol_series(symbol) if arr is None or len(arr) < 3: return None # Simple log returns to stabilize scale x = np.asarray(arr, dtype=float) with np.errstate(divide='ignore', invalid='ignore'): ret = np.diff(np.log(x)) ret = ret[np.isfinite(ret)] if ret.size <= 0: return None if lookback and lookback > 0 and ret.size > lookback: ret = ret[-int(lookback):] return ret.astype(np.float32, copy=False) def _fetch_symbol_df( symbol: str, timeframe: str, bars: int, *, as_of: Optional[Any] = None, drop_last_live: bool = True, ) -> pd.DataFrame: """Fetch last `bars` candles for symbol/timeframe. Returns DataFrame with columns at least ['time','open','high','low','close','tick_volume','real_volume'] where available. 'time' is UTC epoch seconds as float. """ tf = TIMEFRAME_MAP.get(timeframe) if tf is None: raise ValueError(f"Unknown timeframe: {timeframe}") # Use a small guard for extra bars; server fetch typically asks +2 if as_of is not None: try: to_dt = pd.to_datetime(as_of) if to_dt.tzinfo is None: to_dt = to_dt.tz_localize("UTC") to_dt = to_dt.to_pydatetime() except Exception: to_dt = pd.Timestamp.utcnow().to_pydatetime() else: to_dt = pd.Timestamp.utcnow().to_pydatetime() rates = _mt5_copy_rates_from(symbol, tf, to_dt, int(bars) + 2) if rates is None or len(rates) == 0: raise RuntimeError(f"Failed to fetch rates for {symbol}") df = pd.DataFrame(rates) # Normalize epoch to UTC epoch float seconds try: if 'time' in df.columns: df['time'] = df['time'].astype(float).apply(_mt5_epoch_to_utc) except Exception: pass if drop_last_live and as_of is None and len(df) >= 2: df = df.iloc[:-1] # Keep last `bars` rows if len(df) > bars: df = df.iloc[-bars:].copy() return df def _prepare_series( symbol: str, timeframe: str, max_bars: int, denoise: Optional[Dict[str, Any]] = None, *, as_of: Optional[Any] = None, drop_last_live: bool = True, ) -> Optional[_SeriesStore]: """Fetch and optionally denoise a symbol series; return _SeriesStore with ascending times.""" df = _fetch_symbol_df(symbol, timeframe, max_bars, as_of=as_of, drop_last_live=drop_last_live) # Ensure 'volume' exists for denoise convenience if 'volume' not in df.columns and 'tick_volume' in df.columns: df['volume'] = df['tick_volume'] # Apply optional denoise to 'close' in-place if denoise and isinstance(denoise, dict): try: dn = dict(denoise) # Default pre-window denoise, apply to 'close' only unless caller overrode dn.setdefault('when', 'pre_ti') dn.setdefault('columns', ['close']) dn.setdefault('keep_original', False) _apply_denoise_util(df, dn, default_when='pre_ti') except Exception: # Fallback to raw if denoise fails pass # Extract arrays (ascending time assumed) try: # Convert and align by finite mask across both arrays t, c = align_finite(df['time'], df['close']) if t.size < 10: return None return _SeriesStore(symbol=symbol, time_epoch=t, close=c) except Exception: return None def build_index( symbols: List[str], timeframe: str, window_size: int, future_size: int, max_bars_per_symbol: int = 5000, denoise: Optional[Dict[str, Any]] = None, scale: str = "minmax", metric: str = "euclidean", pca_components: Optional[int] = None, # New flexible dimension reduction interface dimred_method: Optional[str] = None, dimred_params: Optional[Dict[str, Any]] = None, engine: str = "ckdtree", *, as_of: Optional[Any] = None, drop_last_live: bool = True, ) -> PatternIndex: """Build a PatternIndex from MT5 data for the provided symbols. Notes: - Windows are created over ascending time arrays. Window i corresponds to close[i : i+window_size]. Matches can expose window+future values. - Per-window min-max normalization is applied for the index vectors. """ assert window_size >= 5, "window_size too small" symbols_ok: List[str] = [] series: List[_SeriesStore] = [] for sym in symbols: ser = _prepare_series( sym, timeframe, max_bars=max_bars_per_symbol, denoise=denoise, as_of=as_of, drop_last_live=drop_last_live, ) if ser is None: continue # Require enough bars for at least one window if ser.close.size >= (window_size + future_size): series.append(ser) symbols_ok.append(sym) if not series: raise RuntimeError("No symbols had sufficient data to build pattern index") # Build windows X_list: List[np.ndarray] = [] start_end: List[Tuple[int, int]] = [] labels: List[int] = [] for lbl, ser in enumerate(series): n = ser.close.size limit = n - (window_size + future_size) + 1 if limit <= 0: continue # Create sliding indices starts = np.arange(limit, dtype=int) ends = starts + (window_size - 1) # Gather windows using stride-based indexing idx = starts[:, None] + np.arange(window_size)[None, :] w = ser.close[idx] # Apply per-row scaling sc = (scale or "minmax").lower() if sc == "zscore": mu = np.nanmean(w, axis=1, keepdims=True) sd = np.nanstd(w, axis=1, keepdims=True) sd[sd <= 1e-12] = 1.0 X_scaled = ((w - mu) / sd).astype(np.float32) elif sc == "none": X_scaled = w.astype(np.float32) else: # minmax mn = np.nanmin(w, axis=1, keepdims=True) mx = np.nanmax(w, axis=1, keepdims=True) rng = (mx - mn) rng[rng <= 1e-12] = 1.0 X_scaled = ((w - mn) / rng).astype(np.float32) X_list.append(X_scaled) start_end.extend(list(np.stack([starts, ends], axis=1))) labels.extend([lbl] * starts.size) if not X_list: raise RuntimeError("Failed to create any windows for the provided symbols") X = np.vstack(X_list) # Optional dimensionality reduction pca_model = None reducer: Optional[_DimReducer] = None # Back-compat: if pca_components provided, prefer PCA effective_dimred_method = (dimred_method or ("pca" if (pca_components and int(pca_components) > 0) else "none")) effective_dimred_params: Dict[str, Any] = dict(dimred_params or {}) if (pca_components and int(pca_components) > 0) and (not dimred_method or str(dimred_method).lower() in ("", "none", "pca")): # Ensure components bound to window size effective_dimred_params.setdefault("n_components", max(1, min(int(pca_components), int(X.shape[1])))) effective_dimred_method = "pca" if effective_dimred_method and str(effective_dimred_method).lower() not in ("none", "false"): reducer, info = _create_reducer(effective_dimred_method, effective_dimred_params) # If reducer requires n_components, ensure it does not exceed window length try: if hasattr(reducer, "n_components"): nc = int(getattr(reducer, "n_components")) if nc > int(X.shape[1]): # Recreate reducer with clipped components effective_dimred_params["n_components"] = int(X.shape[1]) reducer, info = _create_reducer(effective_dimred_method, effective_dimred_params) except Exception: pass X = reducer.fit_transform(X) X = X.astype(np.float32, copy=False) # Metric transform (for cosine/correlation) met = (metric or "euclidean").lower() if met == "cosine": # L2-normalize rows norms = np.linalg.norm(X, axis=1, keepdims=True) norms[norms <= 1e-12] = 1.0 X = (X / norms).astype(np.float32) elif met == "correlation": # If not PCA-centered already, re-center rows; then L2 normalize if pca_model is None: X = X - np.nanmean(X, axis=1, keepdims=True) norms = np.linalg.norm(X, axis=1, keepdims=True) norms[norms <= 1e-12] = 1.0 X = (X / norms).astype(np.float32) start_end_idx = np.asarray(start_end, dtype=int) labels_arr = np.asarray(labels, dtype=int) eng = (engine or "ckdtree").lower() if eng in ("matrix_profile", "mass"): tree_obj = None # search will bypass tree elif eng == "hnsw": if _HNSW is None: raise RuntimeError("hnswlib not available; install hnswlib or use engine='ckdtree'") dim = int(X.shape[1]) index = _HNSW.Index(space='l2', dim=dim) # Defaults tuned for good recall/speed tradeoff; can be parameterized later index.init_index(max_elements=int(X.shape[0]), ef_construction=200, M=16) index.add_items(X.astype(np.float32), np.arange(X.shape[0], dtype=np.int32)) index.set_ef(64) # ef_search tree_obj = index elif eng == "ckdtree": tree_obj = cKDTree(X) else: raise ValueError(f"Unknown engine '{eng}'") return PatternIndex( timeframe=timeframe, window_size=int(window_size), future_size=int(future_size), symbols=symbols_ok, tree=tree_obj, X=X, start_end_idx=start_end_idx, labels=labels_arr, series=series, scale=(scale or "minmax").lower(), metric=(metric or "euclidean").lower(), pca_components=int(pca_components) if pca_components else None, pca_model=pca_model, dimred_method=str(effective_dimred_method or 'none'), dimred_params=effective_dimred_params, reducer=reducer, engine=eng, max_bars_per_symbol=int(max_bars_per_symbol), ) def _apply_scale_vector(x: np.ndarray, scale: str) -> np.ndarray: s = (scale or "minmax").lower() x = np.asarray(x, dtype=float) if s == "zscore": mu = float(np.nanmean(x)) sd = float(np.nanstd(x)) if not np.isfinite(sd) or sd <= 1e-12: return np.zeros_like(x, dtype=np.float32) return ((x - mu) / sd).astype(np.float32) if s == "none": return x.astype(np.float32) # minmax mn = float(np.nanmin(x)) mx = float(np.nanmax(x)) rng = mx - mn if not np.isfinite(rng) or rng <= 1e-12: return np.zeros_like(x, dtype=np.float32) return ((x - mn) / rng).astype(np.float32) def _apply_metric_vector(x: np.ndarray, metric: str) -> np.ndarray: m = (metric or "euclidean").lower() v = np.asarray(x, dtype=np.float32) if m == "cosine": n = float(np.linalg.norm(v)) if not np.isfinite(n) or n <= 1e-12: return np.zeros_like(v, dtype=np.float32) return (v / n).astype(np.float32) if m == "correlation": v = v - float(np.nanmean(v)) n = float(np.linalg.norm(v)) if not np.isfinite(n) or n <= 1e-12: return np.zeros_like(v, dtype=np.float32) return (v / n).astype(np.float32) return v

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/emerzon/mt-data-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server