"""MT5 connection management and safe namespace setup."""
import logging
import threading
from typing import Dict, Any, Optional
import MetaTrader5 as mt5
logger = logging.getLogger(__name__)
# Global lock for MT5 thread safety (MT5 library is NOT thread-safe)
_mt5_lock = threading.Lock()
def safe_mt5_call(func, *args, **kwargs):
"""
Execute MT5 function call with thread safety.
This wrapper ensures that only one thread can call MT5 functions at a time,
preventing race conditions and potential crashes from concurrent access.
Args:
func: MT5 function to call
*args: Positional arguments for the function
**kwargs: Keyword arguments for the function
Returns:
Result from the MT5 function call
Example:
>>> rates = safe_mt5_call(mt5.copy_rates_from_pos, "BTCUSD", mt5.TIMEFRAME_H1, 0, 100)
"""
with _mt5_lock:
return func(*args, **kwargs)
class MT5Connection:
"""Manages MetaTrader 5 connection and provides safe execution namespace."""
def __init__(self):
"""Initialize MT5 connection at startup."""
self._initialized = False
self._safe_namespace: Optional[Dict[str, Any]] = None
self._initialize()
def _initialize(self, max_retries=3, retry_delay=1.0):
"""Initialize connection to MT5 terminal with thread safety and retry logic.
Args:
max_retries: Maximum number of initialization attempts
retry_delay: Delay in seconds between retries
"""
import time
last_error = None
for attempt in range(max_retries):
try:
with _mt5_lock:
if not mt5.initialize():
error = mt5.last_error()
last_error = (
"MT5 initialization failed (attempt "
f"{attempt + 1}/{max_retries}): {error}"
)
logger.warning(last_error)
if attempt < max_retries - 1:
time.sleep(retry_delay)
continue
logger.error("All %s initialization attempts failed", max_retries)
raise RuntimeError(
"Failed to initialize MT5 after " f"{max_retries} attempts: {error}"
)
self._initialized = True
logger.info(f"MT5 connection initialized successfully on attempt {attempt + 1}")
break
except RuntimeError:
raise
except Exception as e:
last_error = f"Unexpected error during MT5 initialization: {str(e)}"
logger.error(last_error, exc_info=True)
if attempt < max_retries - 1:
time.sleep(retry_delay)
continue
raise RuntimeError(
"Failed to initialize MT5 after " f"{max_retries} attempts: {str(e)}"
) from e
# Build safe namespace with read-only functions
try:
self._safe_namespace = self._build_safe_namespace()
except Exception as e:
logger.error(f"Failed to build safe namespace: {e}", exc_info=True)
raise RuntimeError(f"Failed to build execution namespace: {str(e)}") from e
def _build_safe_namespace(self) -> Dict[str, Any]:
"""Build namespace with only read-only MT5 functions and helpers."""
import datetime
import pandas as pd
import numpy as np
import matplotlib
matplotlib.use("Agg") # Non-interactive backend for server use
import matplotlib.pyplot as plt
import ta
try:
import plotly.express as px
import plotly.graph_objects as go
plotly_available = True
except ImportError:
px = None
go = None
plotly_available = False
# Whitelist read-only MT5 functions
safe_mt5_funcs = {
# Symbol information
"symbol_info": mt5.symbol_info,
"symbol_info_tick": mt5.symbol_info_tick,
"symbol_select": mt5.symbol_select,
"symbols_total": mt5.symbols_total,
"symbols_get": mt5.symbols_get,
# Market data
"copy_rates_from": mt5.copy_rates_from,
"copy_rates_from_pos": mt5.copy_rates_from_pos,
"copy_rates_range": mt5.copy_rates_range,
"copy_ticks_from": mt5.copy_ticks_from,
"copy_ticks_range": mt5.copy_ticks_range,
# Account information (read-only)
"account_info": mt5.account_info,
"terminal_info": mt5.terminal_info,
"version": mt5.version,
# Trading history (read-only)
"history_deals_get": mt5.history_deals_get,
"history_orders_get": mt5.history_orders_get,
"positions_get": mt5.positions_get,
"positions_total": mt5.positions_total,
# Timeframe constants
"TIMEFRAME_M1": mt5.TIMEFRAME_M1,
"TIMEFRAME_M2": mt5.TIMEFRAME_M2,
"TIMEFRAME_M3": mt5.TIMEFRAME_M3,
"TIMEFRAME_M4": mt5.TIMEFRAME_M4,
"TIMEFRAME_M5": mt5.TIMEFRAME_M5,
"TIMEFRAME_M6": mt5.TIMEFRAME_M6,
"TIMEFRAME_M10": mt5.TIMEFRAME_M10,
"TIMEFRAME_M12": mt5.TIMEFRAME_M12,
"TIMEFRAME_M15": mt5.TIMEFRAME_M15,
"TIMEFRAME_M20": mt5.TIMEFRAME_M20,
"TIMEFRAME_M30": mt5.TIMEFRAME_M30,
"TIMEFRAME_H1": mt5.TIMEFRAME_H1,
"TIMEFRAME_H2": mt5.TIMEFRAME_H2,
"TIMEFRAME_H3": mt5.TIMEFRAME_H3,
"TIMEFRAME_H4": mt5.TIMEFRAME_H4,
"TIMEFRAME_H6": mt5.TIMEFRAME_H6,
"TIMEFRAME_H8": mt5.TIMEFRAME_H8,
"TIMEFRAME_H12": mt5.TIMEFRAME_H12,
"TIMEFRAME_D1": mt5.TIMEFRAME_D1,
"TIMEFRAME_W1": mt5.TIMEFRAME_W1,
"TIMEFRAME_MN1": mt5.TIMEFRAME_MN1,
# Tick flags
"COPY_TICKS_ALL": mt5.COPY_TICKS_ALL,
"COPY_TICKS_INFO": mt5.COPY_TICKS_INFO,
"COPY_TICKS_TRADE": mt5.COPY_TICKS_TRADE,
# Order calculation functions (read-only)
"order_calc_margin": mt5.order_calc_margin,
"order_calc_profit": mt5.order_calc_profit,
"ORDER_TYPE_BUY": mt5.ORDER_TYPE_BUY,
"ORDER_TYPE_SELL": mt5.ORDER_TYPE_SELL,
}
# Create mt5 module-like object with only safe functions
class SafeMT5:
"""Safe MT5 module proxy with only read-only functions."""
def __init__(self, funcs):
for name, func in funcs.items():
setattr(self, name, func)
safe_mt5 = SafeMT5(safe_mt5_funcs)
# Build complete namespace
namespace = {
"mt5": safe_mt5,
"datetime": datetime,
"pd": pd,
"pandas": pd,
"np": np,
"numpy": np,
"plt": plt,
"matplotlib": matplotlib,
"ta": ta,
}
# Add plotly if available
if plotly_available:
namespace["px"] = px
namespace["go"] = go
namespace["plotly"] = __import__("plotly")
return namespace
def validate_connection(self) -> bool:
"""Validate MT5 connection is still active with thread safety."""
if not self._initialized:
return False
# Check if terminal is still connected (thread-safe)
with _mt5_lock:
terminal_info = mt5.terminal_info()
if terminal_info is None:
logger.warning("MT5 terminal connection lost")
return False
return True
def get_safe_namespace(self) -> Dict[str, Any]:
"""Get the safe execution namespace."""
if not self.validate_connection():
raise RuntimeError("MT5 connection is not active. Ensure MT5 terminal is running.")
return self._safe_namespace.copy()
def shutdown(self):
"""Shutdown MT5 connection with thread safety."""
with _mt5_lock:
if self._initialized:
mt5.shutdown()
self._initialized = False
logger.info("MT5 connection closed")
# Global connection instance
_connection: Optional[MT5Connection] = None
def get_connection() -> MT5Connection:
"""Get or create the global MT5 connection instance."""
global _connection
if _connection is None:
_connection = MT5Connection()
return _connection
def shutdown_connection():
"""Shutdown the global MT5 connection."""
global _connection
if _connection is not None:
_connection.shutdown()
_connection = None
def get_safe_namespace() -> Dict[str, Any]:
"""Get the safe execution namespace (convenience function)."""
return get_connection().get_safe_namespace()
def validate_connection() -> Dict[str, Any]:
"""
Validate MT5 connection and return status.
Returns:
Dictionary with 'connected' (bool) and 'error' (str) keys
"""
try:
conn = get_connection()
is_connected = conn.validate_connection()
return {
"connected": is_connected,
"error": None if is_connected else "Connection validation failed",
}
except Exception as e:
logger.error(f"Connection validation error: {str(e)}")
return {"connected": False, "error": str(e)}