Skip to main content
Glama
emerzon

MetaTrader5 MCP Server

by emerzon
utils.py12 kB
import io import csv from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Tuple, Set import pandas as pd import dateparser from .constants import PRECISION_ABS_TOL, PRECISION_MAX_DECIMALS, PRECISION_REL_TOL, TIME_DISPLAY_FORMAT def _coerce_scalar(s: str): """Try to coerce a scalar string to int or float; otherwise return original string.""" try: if s is None: return s st = str(s).strip() if st == "": return st if st.isdigit() or (st.startswith('-') and st[1:].isdigit()): return int(st) v = float(st) return v except Exception: return s def _normalize_ohlcv_arg(ohlcv: Optional[str]) -> Optional[Set[str]]: """Normalize user-provided OHLCV selection into a set of letters. Accepts forms like: 'close', 'price', 'ohlc', 'ohlcv', 'all', 'cl', 'OHLCV', or names 'open,high,low,close,volume'. Returns None when not specified. """ if ohlcv is None: return None text = str(ohlcv).strip() if text == "": return None t = text.lower() if t in ("all", "ohlcv"): return {"O", "H", "L", "C", "V"} if t in ("ohlc",): return {"O", "H", "L", "C"} if t in ("price", "close"): return {"C"} # Compact letters like 'cl', 'oh', etc. if all(ch in "ohlcv" for ch in t): return {ch.upper() for ch in t} # Comma separated names parts = [p.strip().lower() for p in t.replace(";", ",").split(",") if p.strip() != ""] if not parts: return None mapping = { "o": "O", "open": "O", "h": "H", "high": "H", "l": "L", "low": "L", "c": "C", "close": "C", "price": "C", "v": "V", "vol": "V", "volume": "V", "tick_volume": "V", } out: Set[str] = set() for p in parts: key = mapping.get(p) if key: out.add(key) return out or None def _csv_from_rows(headers: List[str], rows: List[List[Any]]) -> Dict[str, Any]: """Build a normalized CSV payload for tabular results. Returns a dict with at least: - csv_header: comma-separated column names - csv_data: newline-separated CSV rows - success: True - count: number of data rows """ data_buf = io.StringIO() writer = csv.writer(data_buf, lineterminator="\n") for row in rows: writer.writerow(row) return { "csv_header": ",".join(headers), "csv_data": data_buf.getvalue().rstrip("\n"), "success": True, "count": len(rows), } # Backwards-compat alias for refactored imports _csv_from_rows_util = _csv_from_rows def _format_time_minimal(epoch_seconds: float) -> str: """Format epoch seconds into a normalized UTC datetime string. Normalized format everywhere: YYYY-MM-DD HH:MM:SS (UTC) """ dt = datetime.utcfromtimestamp(epoch_seconds) return dt.strftime(TIME_DISPLAY_FORMAT) # Backwards-compat alias _format_time_minimal_util = _format_time_minimal def _format_time_minimal_local(epoch_seconds: float) -> str: """Format epoch seconds into a normalized local/client datetime string. Normalized format everywhere: YYYY-MM-DD HH:MM:SS (local/client tz) Falls back to UTC if tz resolution fails. """ from ..core.config import mt5_config try: tz = mt5_config.get_client_tz() if tz is not None: dt = datetime.fromtimestamp(epoch_seconds, tz=timezone.utc).astimezone(tz) else: dt = datetime.fromtimestamp(epoch_seconds, tz=timezone.utc).astimezone() return dt.strftime(TIME_DISPLAY_FORMAT) except Exception: return _format_time_minimal(epoch_seconds) # Backwards-compat alias _format_time_minimal_local_util = _format_time_minimal_local def _use_client_tz(_: object = None) -> bool: """Return True when a client timezone is configured.""" from ..core.config import mt5_config try: return mt5_config.get_client_tz() is not None except Exception: return False # Backwards-compat alias _use_client_tz_util = _use_client_tz def _resolve_client_tz(_: object = None): """Return the configured client timezone, if any.""" from ..core.config import mt5_config try: return mt5_config.get_client_tz() except Exception: return None # Backwards-compat alias _resolve_client_tz_util = _resolve_client_tz def _time_format_from_epochs(epochs: List[float]) -> str: """Return the normalized display format regardless of epoch contents.""" return TIME_DISPLAY_FORMAT # Backwards-compat alias _time_format_from_epochs_util = _time_format_from_epochs def _maybe_strip_year(fmt: str, epochs: List[float]) -> str: """No-op when normalization is requested; keep full year for consistency.""" return fmt # Backwards-compat alias _maybe_strip_year_util = _maybe_strip_year def _style_time_format(fmt: str) -> str: """No special styling; keep normalized spacing.""" try: if 'T' in fmt: return fmt.replace('T', ' ') except Exception: pass return fmt # Backwards-compat alias _style_time_format_util = _style_time_format def _optimal_decimals(values: List[float], rel_tol: float = PRECISION_REL_TOL, abs_tol: float = PRECISION_ABS_TOL, max_decimals: int = PRECISION_MAX_DECIMALS) -> int: if not values: return 0 nums = [float(v) for v in values if v is not None and not pd.isna(v)] if not nums: return 0 scale = max(1.0, max(abs(v) for v in nums)) tol = max(abs_tol, rel_tol * scale) for d in range(0, max_decimals + 1): ok = True factor = 10.0 ** d for v in nums: rv = round(v * factor) / factor if abs(rv - v) > tol: ok = False break if ok: return d return max_decimals def parse_kv_or_json(obj: Any) -> Dict[str, Any]: """Parse params/features provided as dict, JSON string, or k=v pairs into a dict. - Dict: shallow-copied and returned - JSON-like string: parsed via json.loads (with simple fallback for colon/equals pairs) - Plain string: split on whitespace/commas into k=v assignments """ import json if obj is None: return {} if isinstance(obj, dict): return dict(obj) if isinstance(obj, str): s = obj.strip() if not s: return {} if (s.startswith('{') and s.endswith('}')): try: return json.loads(s) except Exception: # Fallback to simple token parser inside braces s = s.strip().strip('{}').strip() # Parse simple k=v tokens separated by whitespace/commas out: Dict[str, Any] = {} toks = [tok for tok in s.replace(',', ' ').split() if tok] i = 0 while i < len(toks): tok = toks[i].strip().strip(',') if not tok: i += 1 continue if '=' in tok: k, v = tok.split('=', 1) out[k.strip()] = v.strip().strip(',') i += 1 continue if tok.endswith(':'): key = tok[:-1].strip() val = '' if i + 1 < len(toks): val = toks[i + 1].strip().strip(',') i += 2 else: i += 1 out[key] = val continue i += 1 return out return {} def _format_float(v: float, d: int) -> str: s = f"{v:.{d}f}" if '.' in s: s = s.rstrip('0').rstrip('.') return s def _format_numeric_rows_from_df(df: pd.DataFrame, headers: List[str]) -> List[List[str]]: out_rows: List[List[str]] = [] decimals_by_col: Dict[str, int] = {} for col in headers: if col not in df.columns or col == 'time': continue if pd.api.types.is_float_dtype(df[col]): decimals_by_col[col] = _optimal_decimals(df[col].tolist()) for _, row in df[headers].iterrows(): out_row: List[str] = [] for col in headers: val = row[col] if col == 'time': out_row.append(str(val)) elif isinstance(val, (float,)) and col in decimals_by_col: out_row.append(_format_float(float(val), decimals_by_col[col])) else: out_row.append(str(val)) out_rows.append(out_row) return out_rows # Backwards-compat alias _format_numeric_rows_from_df_util = _format_numeric_rows_from_df def to_float_np( values: Any, *, coerce: bool = True, drop_na: bool = False, finite_only: bool = False, return_mask: bool = False, ) -> "np.ndarray | Tuple[np.ndarray, 'np.ndarray']": """Convert a pandas Series/array-like to a float NumPy array. - coerce=True uses `pd.to_numeric(errors='coerce')` to convert invalids to NaN. - drop_na=True removes NaN entries from the returned array (mask applied). - finite_only=True removes non-finite entries (NaN, inf, -inf). - return_mask=True returns (array, mask) where mask marks kept elements. Notes: When both drop_na and finite_only are False, the original length is preserved. """ import numpy as np # local import try: # Normalize to pandas Series for robust conversion if hasattr(values, "to_numpy") and hasattr(values, "dtype"): ser = pd.Series(values) else: ser = pd.Series(values) arr = ( pd.to_numeric(ser, errors="coerce").astype(float).to_numpy() if coerce else ser.astype(float).to_numpy() ) mask = None if drop_na or finite_only: if finite_only: mask = np.isfinite(arr) else: mask = ~pd.isna(arr) arr = arr[mask] if return_mask: if mask is None: mask = np.ones(arr.shape, dtype=bool) return arr, mask return arr except Exception: # Fallbacks try: arr = np.asarray(values, dtype=float) if drop_na or finite_only: m = np.isfinite(arr) if finite_only else ~pd.isna(arr) arr = arr[m] if return_mask: return arr, m elif return_mask: return arr, np.ones(arr.shape, dtype=bool) return arr except Exception: empty = np.asarray([], dtype=float) if return_mask: return empty, np.asarray([], dtype=bool) return empty def align_finite(*arrays: Any) -> Tuple["np.ndarray", ...]: """Convert arrays to float and align them by keeping only rows where all are finite. Returns a tuple of filtered arrays, all of equal length. """ import numpy as np conv = [to_float_np(a) for a in arrays] if not conv: return tuple() mask = np.ones_like(conv[0], dtype=bool) for a in conv: mask &= np.isfinite(a) return tuple(a[mask] for a in conv) def _parse_start_datetime(value: str) -> Optional[datetime]: """Parse a flexible date/time string into a UTC-naive datetime. Accepts formats like: - 2025-08-29 - 2025-08-29 14:30 - yesterday 14:00 - 2 days ago - 2025/08/29 14:30 UTC """ if not value: return None dt = dateparser.parse( value, settings={ 'RETURN_AS_TIMEZONE_AWARE': True, 'TIMEZONE': 'UTC', 'TO_TIMEZONE': 'UTC', 'PREFER_DAY_OF_MONTH': 'first', }, ) if not dt: return None if dt.tzinfo is not None: dt = dt.astimezone(timezone.utc).replace(tzinfo=None) return dt # Backwards-compat alias _parse_start_datetime_util = _parse_start_datetime

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/emerzon/mt-data-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server