ULTIMATE_INSTITUTIONAL_GRADE_TRADING_SYSTEM_FIX.md•54.9 kB
# 🏆 ULTIMATE INSTITUTIONAL-GRADE TRADING SYSTEM - COMPLETE TRANSFORMATION
**Version:** 3.0 INSTITUTIONAL
**Date:** 2025-11-25
**Priority:** КРИТИЧЕСКОЕ - ПОЛНАЯ ТРАНСФОРМАЦИЯ
**Target:** Институциональное качество с 70%+ win rate
---
## 📋 EXECUTIVE SUMMARY
### 🎯 Mission
Transform the trading system from "technically working" to **"Institutional-Grade Professional Trading System"** with verifiable 70%+ win rate, adaptive market regime detection, ML-enhanced probability estimation, and zero empty reports.
### 🔍 Current State Analysis
**КРИТИЧЕСКИЕ ПРОБЛЕМЫ обнаружены:**
1. **Empty Reports Problem** ❌
- System scans 652 assets → finds 99 candidates → outputs **ZERO**
- Причина: Multiple hard filters cascade (7.0/20 → 8.0/10 → final filter)
- Result: User gets "No opportunities found" даже когда рынок active
2. **Scoring Architecture Issues** ❌
- 20-point raw score → normalized late → inconsistent filtering
- Different thresholds at different stages cause confusion
- Score normalization происходит ПОСЛЕ deep analysis (waste of resources)
3. **Direction Bias** ❌
- System может показать только LONG или только SHORT
- CRITICAL_REQUIREMENTS.md нарушается
- User doesn't see both market perspectives
4. **No Tier Classification** ❌
- All opportunities treated equally
- No clear "Elite vs Professional vs Speculative" categorization
- User can't distinguish opportunity quality
5. **Static Thresholds** ❌
- Fixed 8.0/10 threshold regardless of market conditions
- In strong bull: should relax LONG threshold, tighten SHORT
- In strong bear: should tighten LONG threshold, relax SHORT
6. **No ML Enhancement** ❌
- Probability calculation: static formula
- No historical pattern learning
- SignalTracker exists but not used for improvement
7. **No Regime Detection** ❌
- System doesn't detect market regime automatically
- No adaptive behavior based on regime
- Treats sideways market same as trending market
8. **No Performance Tracking** ❌
- No metrics dashboard
- No signal quality tracking
- No continuous improvement loop
---
## 🏗️ ARCHITECTURAL SOLUTION
### 📊 NEW SCORING & FILTERING PIPELINE
```python
# ═══════════════════════════════════════════════════════════
# НОВЫЙ PIPELINE (ПРАВИЛЬНЫЙ)
# ═══════════════════════════════════════════════════════════
def analyze_market_NEW_ARCHITECTURE():
"""
ЭТАП 1: SCANNING (без фильтров по score)
"""
# 1.1 Get BTC Analysis (для regime detection)
btc_analysis = await analyze_btc()
market_regime = detect_market_regime(btc_analysis) # NEW!
# 1.2 Scan market (БЕЗ hard filters!)
all_candidates = await scan_all_directions(
long_criteria={...},
short_criteria={...}
) # Returns ALL candidates with ANY score
"""
ЭТАП 2: SCORING (raw 20-point)
"""
for candidate in all_candidates:
# Calculate raw 20-point score
raw_score = calculate_confluence_score_20point(candidate)
candidate["raw_score_20"] = raw_score
# IMMEDIATE normalization to 0-10
normalized_score = normalize_to_10(raw_score, system="20-point")
candidate["score"] = normalized_score
candidate["confluence_score"] = normalized_score
candidate["final_score"] = normalized_score
"""
ЭТАП 3: TIER CLASSIFICATION (вместо hard reject)
"""
for candidate in all_candidates:
tier = classify_tier(
candidate["score"],
candidate["probability"],
candidate["risk_reward"]
)
candidate["tier"] = tier # "elite", "professional", "speculative", "high_risk"
candidate["recommendation"] = get_tier_recommendation(tier)
"""
ЭТАП 4: ADAPTIVE THRESHOLDS (based on regime)
"""
adaptive_thresholds = calculate_adaptive_thresholds(market_regime)
# Example: In strong bull, LONG threshold = 6.0, SHORT = 8.5
"""
ЭТАП 5: DIRECTION SEPARATION (ОБЯЗАТЕЛЬНО ОБА!)
"""
all_longs = [c for c in all_candidates if c["side"] == "long"]
all_shorts = [c for c in all_candidates if c["side"] == "short"]
# Sort by score
all_longs.sort(key=lambda x: x["score"], reverse=True)
all_shorts.sort(key=lambda x: x["score"], reverse=True)
"""
ЭТАП 6: SMART DISPLAY LOGIC (ВСЕГДА показываем)
"""
# LONG: ВСЕГДА показываем TOP-3
top_longs = select_top_3_with_warnings(
all_longs,
adaptive_thresholds["long"],
market_regime
)
# SHORT: ВСЕГДА показываем TOP-3
top_shorts = select_top_3_with_warnings(
all_shorts,
adaptive_thresholds["short"],
market_regime
)
"""
ЭТАП 7: ML-ENHANCED PROBABILITY (if available)
"""
if ml_predictor:
for opp in top_longs + top_shorts:
ml_probability = ml_predictor.predict_success(
pattern_type=opp["pattern_type"],
confluence_score=opp["score"],
volume_ratio=opp["volume_ratio"],
btc_aligned=opp["btc_aligned"],
# ... other features
)
opp["ml_probability"] = ml_probability
opp["probability"] = (opp["probability"] + ml_probability) / 2
"""
ЭТАП 8: FINAL REPORT (БОГАТЫЙ контент)
"""
return {
"market_regime": market_regime,
"adaptive_thresholds": adaptive_thresholds,
"top_3_longs": top_longs, # ВСЕГДА 3 (or less if <3 exist)
"top_3_shorts": top_shorts, # ВСЕГДА 3
"all_longs_count": len(all_longs),
"all_shorts_count": len(all_shorts),
"elite_count": count_by_tier(all_candidates, "elite"),
"professional_count": count_by_tier(all_candidates, "professional")
}
```
---
## 🔧 IMPLEMENTATION DETAILS
### 1️⃣ TIER CLASSIFICATION SYSTEM
```python
# ═══════════════════════════════════════════════════════════
# FILE: mcp_server/tier_classifier.py
# ═══════════════════════════════════════════════════════════
from typing import Dict, Any, Literal
from loguru import logger
TierType = Literal["elite", "professional", "speculative", "high_risk", "not_recommended"]
class TierClassifier:
"""
Tier-based opportunity classification system
Tiers:
- Elite: Score 8.0-10.0, Prob ≥75%, R:R ≥2.5 (TRADE IMMEDIATELY)
- Professional: Score 6.5-7.9, Prob ≥65%, R:R ≥2.0 (TRADE WITH CAUTION)
- Speculative: Score 5.0-6.4, Prob ≥55%, R:R ≥1.5 (EXPERIENCED ONLY)
- High Risk: Score 4.0-4.9 (SHOW BUT WARN)
- Not Recommended: Score <4.0 (DON'T SHOW in public reports)
"""
@staticmethod
def classify(
score: float,
probability: float,
risk_reward: float
) -> TierType:
"""
Classify opportunity into tier
Args:
score: Confluence score (0-10)
probability: Win probability (0-1)
risk_reward: Risk/reward ratio
Returns:
Tier classification
"""
# Elite Tier (TOP качество)
if score >= 8.0 and probability >= 0.75 and risk_reward >= 2.5:
return "elite"
# Professional Tier (GOOD качество)
elif score >= 6.5 and probability >= 0.65 and risk_reward >= 2.0:
return "professional"
# Speculative Tier (MODERATE риск)
elif score >= 5.0 and probability >= 0.55 and risk_reward >= 1.5:
return "speculative"
# High Risk (SHOW с предупреждением)
elif score >= 4.0:
return "high_risk"
# Not Recommended (НЕ ПОКАЗЫВАТЬ)
else:
return "not_recommended"
@staticmethod
def get_recommendation(tier: TierType) -> str:
"""Get recommendation text for tier"""
recommendations = {
"elite": "✅ ОТКРЫВАТЬ - Отличный setup с высокой вероятностью успеха",
"professional": "⚠️ ОСТОРОЖНО - Хороший setup, рекомендуется уменьшить размер позиции",
"speculative": "⚠️⚠️ ВЫСОКИЙ РИСК - Только для опытных трейдеров, минимальный размер позиции",
"high_risk": "🔴 НЕ РЕКОМЕНДУЕТСЯ - Низкая confluence, paper trade only",
"not_recommended": "⛔ ПРОПУСТИТЬ - Setup слишком слабый"
}
return recommendations.get(tier, "")
@staticmethod
def get_position_size_multiplier(tier: TierType) -> float:
"""
Get position size multiplier based on tier
Returns:
Multiplier for base position size (1.0 = full, 0.5 = half, etc.)
"""
multipliers = {
"elite": 1.0, # Full size (2% risk)
"professional": 0.75, # 1.5% risk
"speculative": 0.5, # 1% risk
"high_risk": 0.25, # 0.5% risk
"not_recommended": 0.0 # Don't trade
}
return multipliers.get(tier, 0.0)
@staticmethod
def get_tier_color(tier: TierType) -> str:
"""Get color emoji for tier"""
colors = {
"elite": "🟢",
"professional": "🟡",
"speculative": "🟠",
"high_risk": "🔴",
"not_recommended": "⛔"
}
return colors.get(tier, "⚪")
```
### 2️⃣ ADAPTIVE THRESHOLDS ENGINE
```python
# ═══════════════════════════════════════════════════════════
# FILE: mcp_server/adaptive_thresholds.py
# ═══════════════════════════════════════════════════════════
from typing import Dict, Any
from loguru import logger
class AdaptiveThresholds:
"""
Dynamic threshold calculation based on market regime
Market Regimes:
1. Strong Bull: BTC +5% week, ADX >25 up
2. Strong Bear: BTC -5% week, ADX >25 down
3. Sideways: BTC ±2% week, ADX <20
4. High Volatility: VIX equivalent high
"""
@staticmethod
def calculate(market_regime: Dict[str, Any]) -> Dict[str, float]:
"""
Calculate adaptive thresholds for LONG and SHORT
Args:
market_regime: Market regime data
Returns:
{"long": float, "short": float} thresholds
"""
regime_type = market_regime.get("type", "sideways")
volatility = market_regime.get("volatility", "normal")
adx = market_regime.get("adx", 20)
# Base thresholds
base_long = 7.0
base_short = 7.0
# ═══════════════════════════════════════════════════
# REGIME-BASED ADJUSTMENTS
# ═══════════════════════════════════════════════════
if regime_type == "strong_bull":
# В сильном бычьем рынке:
# - LONG легче (тренд - друг)
# - SHORT сложнее (против тренда)
base_long -= 1.0 # 6.0
base_short += 1.5 # 8.5
logger.info("Strong Bull regime: LONG threshold relaxed to 6.0, SHORT tightened to 8.5")
elif regime_type == "strong_bear":
# В сильном медвежьем рынке:
# - LONG сложнее (против тренда)
# - SHORT легче (тренд - друг)
base_long += 1.5 # 8.5
base_short -= 1.0 # 6.0
logger.info("Strong Bear regime: LONG tightened to 8.5, SHORT relaxed to 6.0")
elif regime_type == "sideways":
# В боковом рынке:
# - Оба направления умеренные
base_long = 7.0
base_short = 7.0
logger.info("Sideways regime: Both thresholds at 7.0")
# ═══════════════════════════════════════════════════
# VOLATILITY ADJUSTMENTS
# ═══════════════════════════════════════════════════
if volatility == "high":
# При высокой волатильности:
# - Повышаем оба threshold (больше риск)
base_long += 0.5
base_short += 0.5
logger.info("High volatility: Both thresholds increased by 0.5")
elif volatility == "very_low":
# При очень низкой волатильности:
# - Можем немного снизить (меньше риск)
base_long -= 0.25
base_short -= 0.25
logger.info("Very low volatility: Both thresholds decreased by 0.25")
# ═══════════════════════════════════════════════════
# TREND STRENGTH ADJUSTMENTS (ADX)
# ═══════════════════════════════════════════════════
if adx > 35:
# Очень сильный тренд
# - Направление тренда легче
if regime_type == "strong_bull":
base_long -= 0.25
elif regime_type == "strong_bear":
base_short -= 0.25
logger.info(f"Very strong trend (ADX {adx}): Trend direction threshold decreased by 0.25")
# Cap thresholds (reasonable limits)
base_long = max(5.0, min(9.0, base_long))
base_short = max(5.0, min(9.0, base_short))
return {
"long": round(base_long, 1),
"short": round(base_short, 1),
"regime_type": regime_type,
"volatility": volatility,
"reasoning": f"Regime: {regime_type}, Vol: {volatility}, ADX: {adx}"
}
```
### 3️⃣ MARKET REGIME DETECTOR
```python
# ═══════════════════════════════════════════════════════════
# FILE: mcp_server/regime_detector.py
# ═══════════════════════════════════════════════════════════
from typing import Dict, Any, Literal
from loguru import logger
RegimeType = Literal["strong_bull", "strong_bear", "sideways", "uncertain"]
class RegimeDetector:
"""
Automatic market regime detection
Combines:
- BTC trend (direction + strength)
- ADX (trend strength)
- ATR/volatility
- Volume patterns
"""
@staticmethod
def detect(btc_analysis: Dict[str, Any]) -> Dict[str, Any]:
"""
Detect current market regime
Args:
btc_analysis: Full BTC technical analysis
Returns:
Regime data with type, confidence, metrics
"""
# Extract key metrics
composite = btc_analysis.get("composite_signal", {})
h4_data = btc_analysis.get("timeframes", {}).get("4h", {})
h1_data = btc_analysis.get("timeframes", {}).get("1h", {})
# BTC price change (week)
price_change_week = RegimeDetector._calculate_weekly_change(btc_analysis)
# ADX (trend strength)
adx = h4_data.get("indicators", {}).get("adx", {}).get("adx", 20)
# ATR (volatility)
atr = h4_data.get("indicators", {}).get("atr", {}).get("atr_14", 0)
avg_atr = RegimeDetector._get_average_atr(btc_analysis)
volatility = RegimeDetector._classify_volatility(atr, avg_atr)
# Composite signal
signal = composite.get("signal", "HOLD")
confidence = composite.get("confidence", 0.5)
# ═══════════════════════════════════════════════════
# REGIME DETECTION LOGIC
# ═══════════════════════════════════════════════════
# Strong Bull
if (price_change_week > 5.0 and adx > 25 and
signal in ["STRONG_BUY", "BUY"] and confidence > 0.6):
regime_type = "strong_bull"
regime_confidence = 0.8 + (min(confidence, 0.2))
# Strong Bear
elif (price_change_week < -5.0 and adx > 25 and
signal in ["STRONG_SELL", "SELL"] and confidence > 0.6):
regime_type = "strong_bear"
regime_confidence = 0.8 + (min(confidence, 0.2))
# Sideways
elif abs(price_change_week) < 2.0 and adx < 20:
regime_type = "sideways"
regime_confidence = 0.7
# Uncertain (mixed signals)
else:
regime_type = "uncertain"
regime_confidence = 0.5
return {
"type": regime_type,
"confidence": round(regime_confidence, 2),
"metrics": {
"btc_weekly_change_pct": round(price_change_week, 2),
"adx": round(adx, 1),
"signal": signal,
"signal_confidence": round(confidence, 2),
"volatility": volatility
},
"description": RegimeDetector._get_regime_description(regime_type),
"trading_implications": RegimeDetector._get_trading_implications(regime_type)
}
@staticmethod
def _calculate_weekly_change(btc_analysis: Dict) -> float:
"""Calculate BTC weekly price change %"""
try:
d1_data = btc_analysis.get("timeframes", {}).get("1d", {})
current_price = d1_data.get("current_price", 0)
# Approximate weekly change from daily data
# (in real impl, should use actual weekly data)
ohlcv = btc_analysis.get("timeframes", {}).get("1d", {}).get("ohlcv_summary", {})
change_24h = ohlcv.get("change_24h", 0)
# Simple approximation: weekly ≈ daily × 7 (not accurate, need proper calc)
# In production, fetch actual weekly data
return change_24h * 7 # PLACEHOLDER - replace with real calc
except:
return 0.0
@staticmethod
def _get_average_atr(btc_analysis: Dict) -> float:
"""Get average ATR for volatility comparison"""
try:
h4_data = btc_analysis.get("timeframes", {}).get("4h", {})
atr_14 = h4_data.get("indicators", {}).get("atr", {}).get("atr_14", 0)
# In real impl, calculate from historical data
return atr_14 * 1.2 # Assume current ATR is baseline
except:
return 0.0
@staticmethod
def _classify_volatility(current_atr: float, avg_atr: float) -> str:
"""Classify volatility level"""
if avg_atr == 0:
return "normal"
ratio = current_atr / avg_atr
if ratio > 1.5:
return "high"
elif ratio > 1.2:
return "elevated"
elif ratio < 0.7:
return "very_low"
elif ratio < 0.9:
return "low"
else:
return "normal"
@staticmethod
def _get_regime_description(regime_type: RegimeType) -> str:
"""Get human-readable regime description"""
descriptions = {
"strong_bull": "Strong Bullish Trend - Momentum to upside, favor LONG positions",
"strong_bear": "Strong Bearish Trend - Momentum to downside, favor SHORT positions",
"sideways": "Range-Bound Market - No clear trend, favor range trading",
"uncertain": "Mixed Signals - No clear regime, exercise caution"
}
return descriptions.get(regime_type, "Unknown regime")
@staticmethod
def _get_trading_implications(regime_type: RegimeType) -> str:
"""Get trading implications for regime"""
implications = {
"strong_bull": "Relax LONG thresholds, tighten SHORT thresholds. Follow the trend.",
"strong_bear": "Tighten LONG thresholds, relax SHORT thresholds. Short rallies.",
"sideways": "Moderate thresholds both ways. Use range extremes.",
"uncertain": "Strict thresholds both ways. Wait for clarity."
}
return implications.get(regime_type, "")
```
### 4️⃣ SMART DISPLAY LOGIC
```python
# ═══════════════════════════════════════════════════════════
# FILE: mcp_server/smart_display.py
# ═══════════════════════════════════════════════════════════
from typing import List, Dict, Any
from loguru import logger
class SmartDisplay:
"""
Smart display logic that ALWAYS shows opportunities
Rules:
1. ALWAYS show TOP-3 LONG and TOP-3 SHORT
2. Add tier classification and warnings
3. Never return empty
4. Provide educational context
"""
@staticmethod
def select_top_3_with_warnings(
opportunities: List[Dict[str, Any]],
threshold: float,
market_regime: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""
Select top 3 opportunities with appropriate warnings
Args:
opportunities: List of opportunities (sorted by score)
threshold: Adaptive threshold for this direction
market_regime: Current market regime
Returns:
Top 3 opportunities with warnings attached
"""
result = []
# Take top 3 (or less if not enough)
for idx, opp in enumerate(opportunities[:3]):
score = opp.get("score", 0)
tier = opp.get("tier", "not_recommended")
# Add index
opp["rank"] = idx + 1
# ═══════════════════════════════════════════════
# ADD WARNINGS BASED ON SCORE VS THRESHOLD
# ═══════════════════════════════════════════════
if score >= threshold and tier == "elite":
# EXCELLENT - no warning needed
opp["warning"] = None
opp["display_recommendation"] = "✅ ОТЛИЧНЫЙ SETUP - рекомендуется к исполнению"
elif score >= threshold and tier == "professional":
# GOOD - minor warning
opp["warning"] = "⚠️ Professional tier - уменьшите размер позиции"
opp["display_recommendation"] = "✅ ХОРОШИЙ SETUP - осторожно рекомендуется"
elif score >= threshold - 0.5 and tier != "not_recommended":
# CLOSE TO THRESHOLD - acceptable with caution
opp["warning"] = f"⚠️ Score {score:.1f} близок к порогу {threshold:.1f} - повышенная осторожность"
opp["display_recommendation"] = "⚠️ ПРИЕМЛЕМО - только для опытных, минимальный размер"
else:
# BELOW THRESHOLD - significant warning
opp["warning"] = f"⚠️⚠️ Score {score:.1f} ниже порога {threshold:.1f} - высокий риск"
opp["display_recommendation"] = "⚠️ ВЫСОКИЙ РИСК - не рекомендуется или paper trade"
# ═══════════════════════════════════════════════
# ADD REGIME-SPECIFIC CONTEXT
# ═══════════════════════════════════════════════
regime_type = market_regime.get("type", "uncertain")
side = opp.get("side", "long")
if regime_type == "strong_bull" and side == "short":
opp["regime_warning"] = "🔴 ПРОТИВ ТРЕНДА: BTC в сильном uptrend, SHORT рискован"
elif regime_type == "strong_bear" and side == "long":
opp["regime_warning"] = "🔴 ПРОТИВ ТРЕНДА: BTC в сильном downtrend, LONG рискован"
else:
opp["regime_warning"] = None
result.append(opp)
# ═══════════════════════════════════════════════
# IF LESS THAN 3, ADD EXPLANATION
# ═══════════════════════════════════════════════
if len(result) < 3:
logger.info(f"Only {len(result)} opportunities available (needed 3)")
return result
@staticmethod
def format_empty_direction_message(
direction: str,
market_regime: Dict[str, Any],
total_scanned: int
) -> Dict[str, Any]:
"""
Format message when NO opportunities found in direction
This should be RARE with new logic, but handle gracefully
"""
regime_type = market_regime.get("type", "uncertain")
message = {
"direction": direction,
"count": 0,
"message": f"Не найдено {direction} возможностей при сканировании {total_scanned} активов",
"explanation": SmartDisplay._get_empty_explanation(direction, regime_type),
"what_we_wait_for": SmartDisplay._get_what_to_wait(direction, regime_type)
}
return message
@staticmethod
def _get_empty_explanation(direction: str, regime_type: str) -> str:
"""Get explanation for empty direction"""
if direction == "long" and regime_type == "strong_bear":
return "В сильном медвежьем рынке LONG возможности крайне редки и рискованны. Это нормально."
elif direction == "short" and regime_type == "strong_bull":
return "В сильном бычьем рынке SHORT возможности крайне редки и опасны. Это нормально."
else:
return f"Сейчас нет качественных {direction} setup'ов, соответствующих нашим критериям."
@staticmethod
def _get_what_to_wait(direction: str, regime_type: str) -> str:
"""Get what to wait for"""
if direction == "long":
return "Ждём: BTC стабилизация/разворот вверх, oversold на всех TF, volume spike, reversal pattern"
else:
return "Ждём: BTC слабость/разворот вниз, overbought на всех TF, distribution volume, bearish pattern"
```
### 5️⃣ ML-ENHANCED PROBABILITY (Optional but Recommended)
```python
# ═══════════════════════════════════════════════════════════
# FILE: mcp_server/ml_probability_predictor.py
# ═══════════════════════════════════════════════════════════
from typing import Dict, Any, Optional
import numpy as np
from loguru import logger
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
import joblib
from pathlib import Path
class MLProbabilityPredictor:
"""
ML-enhanced probability estimation based on historical signal outcomes
Features used:
- confluence_score
- pattern_type (encoded)
- volume_ratio
- btc_aligned (boolean)
- session (encoded)
- rsi_14
- risk_reward
Target: win/loss from SignalTracker database
Model: RandomForestClassifier (simple, interpretable)
"""
def __init__(self, model_path: str = "data/ml_models/probability_model.pkl"):
self.model_path = Path(model_path)
self.model: Optional[RandomForestClassifier] = None
self.scaler: Optional[StandardScaler] = None
self.feature_names = [
"confluence_score",
"volume_ratio",
"btc_aligned",
"rsi_14",
"risk_reward",
"pattern_encoded",
"session_encoded"
]
# Load model if exists
self._load_model()
def _load_model(self):
"""Load trained model from disk"""
if self.model_path.exists():
try:
data = joblib.load(self.model_path)
self.model = data["model"]
self.scaler = data["scaler"]
logger.info(f"ML probability model loaded from {self.model_path}")
except Exception as e:
logger.warning(f"Failed to load ML model: {e}")
self.model = None
self.scaler = None
else:
logger.info("No ML model found, will use static probability")
self.model = None
self.scaler = None
def predict_probability(
self,
confluence_score: float,
volume_ratio: float,
btc_aligned: bool,
rsi_14: float,
risk_reward: float,
pattern_type: str = "unknown",
session: str = "neutral"
) -> float:
"""
Predict win probability using ML model
Args:
confluence_score: 0-10 score
volume_ratio: Volume ratio vs average
btc_aligned: Whether BTC supports direction
rsi_14: RSI value
risk_reward: R:R ratio
pattern_type: Pattern name
session: Trading session
Returns:
Predicted probability (0-1)
"""
if self.model is None or self.scaler is None:
# Fallback to static calculation
logger.debug("ML model not available, using static probability")
return self._static_probability(confluence_score, risk_reward)
try:
# Encode features
features = self._encode_features(
confluence_score,
volume_ratio,
btc_aligned,
rsi_14,
risk_reward,
pattern_type,
session
)
# Scale
features_scaled = self.scaler.transform([features])
# Predict probability
prob = self.model.predict_proba(features_scaled)[0][1] # Probability of win class
# Clip to reasonable range
prob = np.clip(prob, 0.35, 0.95)
return float(prob)
except Exception as e:
logger.error(f"ML prediction failed: {e}", exc_info=True)
return self._static_probability(confluence_score, risk_reward)
def _encode_features(
self,
confluence_score: float,
volume_ratio: float,
btc_aligned: bool,
rsi_14: float,
risk_reward: float,
pattern_type: str,
session: str
) -> list:
"""Encode features for model"""
# Pattern encoding (simple hash-based for now)
pattern_map = {
"unknown": 0,
"oversold_bounce": 1,
"breakout": 2,
"trend_following": 3,
"reversal": 4
}
pattern_encoded = pattern_map.get(pattern_type.lower(), 0)
# Session encoding
session_map = {
"neutral": 0,
"asian": 1,
"european": 2,
"us": 3,
"overlap": 4
}
session_encoded = session_map.get(session.lower(), 0)
return [
confluence_score,
volume_ratio,
1.0 if btc_aligned else 0.0,
rsi_14,
risk_reward,
pattern_encoded,
session_encoded
]
def _static_probability(self, confluence_score: float, risk_reward: float) -> float:
"""Fallback static probability calculation"""
# Base from confluence
base_prob = 0.50 + (confluence_score - 7.0) * 0.03
# Bonus from R:R
rr_bonus = min(0.10, (risk_reward - 2.0) * 0.03)
prob = base_prob + rr_bonus
return round(np.clip(prob, 0.35, 0.85), 2)
async def train_model(self, signal_tracker):
"""
Train ML model from SignalTracker historical data
This should be called periodically (e.g., weekly) to update
the model with new data
"""
logger.info("Training ML probability model from historical signals...")
try:
# Get completed signals from tracker
completed_signals = await self._get_training_data(signal_tracker)
if len(completed_signals) < 30:
logger.warning(f"Only {len(completed_signals)} completed signals, need minimum 30 for training")
return False
# Prepare features and labels
X, y = self._prepare_training_data(completed_signals)
# Initialize and train model
self.model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
min_samples_split=5,
random_state=42
)
self.scaler = StandardScaler()
X_scaled = self.scaler.fit_transform(X)
self.model.fit(X_scaled, y)
# Save model
self.model_path.parent.mkdir(parents=True, exist_ok=True)
joblib.dump({
"model": self.model,
"scaler": self.scaler,
"feature_names": self.feature_names,
"trained_on": len(completed_signals),
"timestamp": datetime.now().isoformat()
}, self.model_path)
logger.info(f"ML model trained on {len(completed_signals)} signals and saved to {self.model_path}")
return True
except Exception as e:
logger.error(f"Model training failed: {e}", exc_info=True)
return False
async def _get_training_data(self, signal_tracker):
"""Get training data from signal tracker"""
# This would fetch completed signals from database
# For now, placeholder
return []
def _prepare_training_data(self, signals):
"""Prepare X, y for training"""
X = []
y = []
for signal in signals:
features = self._encode_features(
signal.get("confluence_score", 7.0),
signal.get("volume_ratio", 1.0),
signal.get("btc_aligned", False),
signal.get("rsi", 50),
signal.get("risk_reward", 2.0),
signal.get("pattern_type", "unknown"),
signal.get("session", "neutral")
)
X.append(features)
# Label: 1 for win (TP hit), 0 for loss (SL hit)
result = signal.get("result", "")
label = 1 if result == "tp_hit" else 0
y.append(label)
return np.array(X), np.array(y)
```
---
## 📦 INTEGRATION INTO EXISTING SYSTEM
### Modified `market_scanner.py`
```python
# ═══════════════════════════════════════════════════════════
# ADD TO: mcp_server/market_scanner.py
# ═══════════════════════════════════════════════════════════
from .tier_classifier import TierClassifier
from .adaptive_thresholds import AdaptiveThresholds
from .regime_detector import RegimeDetector
from .smart_display import SmartDisplay
class MarketScanner:
"""Enhanced Market Scanner with institutional-grade features"""
def __init__(self, bybit_client, technical_analysis):
self.client = bybit_client
self.ta = technical_analysis
# NEW MODULE INSTANCES
self.tier_classifier = TierClassifier()
self.regime_detector = RegimeDetector()
# ... existing initialization ...
async def scan_market(
self,
criteria: Dict[str, Any],
limit: int = 10,
auto_track: bool = False,
signal_tracker: Optional[Any] = None,
track_limit: int = 3
) -> Dict[str, Any]:
"""
Enhanced scan_market with tier classification and adaptive thresholds
CHANGES FROM ORIGINAL:
1. Remove hard filter at score 7.0
2. Add tier classification for ALL opportunities
3. Add adaptive thresholds based on regime
4. ALWAYS return both LONG and SHORT directions
"""
try:
# ═══════════════════════════════════════════════════
# STEP 1: Regime Detection (NEW!)
# ═══════════════════════════════════════════════════
btc_analysis = await self.ta.analyze_asset("BTC/USDT", timeframes=["1h", "4h", "1d"])
market_regime = self.regime_detector.detect(btc_analysis)
adaptive_thresholds = AdaptiveThresholds.calculate(market_regime)
logger.info(
f"Market Regime: {market_regime['type']} | "
f"LONG threshold: {adaptive_thresholds['long']:.1f} | "
f"SHORT threshold: {adaptive_thresholds['short']:.1f}"
)
# ═══════════════════════════════════════════════════
# STEP 2: Scan Market (REMOVED hard filters)
# ═══════════════════════════════════════════════════
all_tickers = await self.client.get_all_tickers(
market_type=criteria.get('market_type', 'spot')
)
# Basic filtering (volume, etc.)
filtered = self._basic_filter(all_tickers, criteria)
# Parallel analysis (NO score filtering!)
opportunities = await self._parallel_analyze(filtered[:100])
# ═══════════════════════════════════════════════════
# STEP 3: Tier Classification (NEW!)
# ═══════════════════════════════════════════════════
for opp in opportunities:
# Normalize score IMMEDIATELY
raw_score = opp.get("score", 0)
normalized = self._normalize_score(raw_score)
opp["score"] = normalized
opp["confluence_score"] = normalized
opp["final_score"] = normalized
# Classify tier
tier = self.tier_classifier.classify(
score=normalized,
probability=opp.get("probability", 0.5),
risk_reward=opp.get("risk_reward", 2.0)
)
opp["tier"] = tier
opp["tier_recommendation"] = self.tier_classifier.get_recommendation(tier)
opp["tier_color"] = self.tier_classifier.get_tier_color(tier)
opp["position_size_multiplier"] = self.tier_classifier.get_position_size_multiplier(tier)
# ═══════════════════════════════════════════════════
# STEP 4: Separate LONG and SHORT (CRITICAL!)
# ═══════════════════════════════════════════════════
all_longs = [opp for opp in opportunities if opp.get("side", "long") == "long"]
all_shorts = [opp for opp in opportunities if opp.get("side", "long") == "short"]
all_longs.sort(key=lambda x: x["score"], reverse=True)
all_shorts.sort(key=lambda x: x["score"], reverse=True)
# ═══════════════════════════════════════════════════
# STEP 5: Smart Display Selection (NEW!)
# ═══════════════════════════════════════════════════
top_3_longs = SmartDisplay.select_top_3_with_warnings(
all_longs,
adaptive_thresholds["long"],
market_regime
)
top_3_shorts = SmartDisplay.select_top_3_with_warnings(
all_shorts,
adaptive_thresholds["short"],
market_regime
)
# ═══════════════════════════════════════════════════
# STEP 6: Return RICH Response (NEVER EMPTY!)
# ═══════════════════════════════════════════════════
return {
"success": True,
"market_regime": market_regime,
"adaptive_thresholds": adaptive_thresholds,
"top_3_longs": top_3_longs,
"top_3_shorts": top_3_shorts,
"all_longs_count": len(all_longs),
"all_shorts_count": len(all_shorts),
"elite_count": sum(1 for o in opportunities if o["tier"] == "elite"),
"professional_count": sum(1 for o in opportunities if o["tier"] == "professional"),
"total_scanned": len(all_tickers),
"total_analyzed": len(opportunities),
"error": None
}
except Exception as e:
logger.error(f"Error in scan_market: {e}", exc_info=True)
return {
"success": False,
"error": str(e)
}
def _normalize_score(self, raw_score: float) -> float:
"""Normalize score from 20-point to 10-point"""
# Assuming 20-point system
return round(min(10.0, (raw_score / 20.0) * 10.0), 2)
```
---
## 📊 MODIFIED REPORT FORMAT
### New Report Structure (ALWAYS показывает оба направления)
```markdown
# 🔍 ГЛУБОКИЙ АНАЛИЗ РЫНКА
## 📊 Рыночный Режим
• Тип: **Strong Bull** (confidence: 85%)
• BTC недельное изменение: **+8.2%**
• ADX: **31.5** (strong trend)
• Волатильность: **Normal**
**Торговые Импликации:**
Relax LONG thresholds (6.0), tighten SHORT thresholds (8.5). Follow the trend.
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
## 📈 LONG ВОЗМОЖНОСТИ (Топ 3)
### 1. ETH/USDT - 🟢 Elite Tier [Rank #1]
**Score:** 8.5/10 | **Probability:** 78% | **R:R:** 1:2.8
**Entry Plan:**
• Entry: $3,120
• Stop-Loss: $3,085
• Take-Profit: $3,218
• Position Size: 1.0x (full risk)
**Tier:** Elite ✅
**Recommendation:** ✅ ОТЛИЧНЫЙ SETUP - рекомендуется к исполнению
**Key Factors:**
• Multi-TF alignment (4/4 timeframes bullish)
• Strong volume confirmation (2.3x average)
• Bullish Engulfing pattern (78% success rate)
• At EMA200 support confluence
**Warnings:** None
---
### 2. SOL/USDT - 🟡 Professional Tier [Rank #2]
**Score:** 7.2/10 | **Probability:** 71% | **R:R:** 1:2.4
**Entry Plan:**
• Entry: $105.50
• Stop-Loss: $103.80
• Take-Profit: $109.58
**Tier:** Professional ⚠️
**Recommendation:** ✅ ХОРОШИЙ SETUP - осторожно рекомендуется
**Warning:** ⚠️ Professional tier - уменьшите размер позиции
---
### 3. AVAX/USDT - 🟠 Speculative Tier [Rank #3]
**Score:** 6.3/10 | **Probability:** 62% | **R:R:** 1:2.1
**Tier:** Speculative ⚠️⚠️
**Recommendation:** ⚠️ ВЫСОКИЙ РИСК - не рекомендуется или paper trade
**Warning:** ⚠️⚠️ Score 6.3 ниже порога 6.5 - высокий риск
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
## 📉 SHORT ВОЗМОЖНОСТИ (Топ 3)
### 1. DOGE/USDT - 🟠 Speculative Tier [Rank #1]
**Score:** 5.8/10 | **Probability:** 58% | **R:R:** 1:2.2
**Entry Plan:**
• Entry: $0.0875
• Stop-Loss: $0.0892
• Take-Profit: $0.0837
**Tier:** Speculative ⚠️⚠️
**Recommendation:** ⚠️ ПРИЕМЛЕМО - только для опытных, минимальный размер
**Warning:** ⚠️⚠️ Score 5.8 значительно ниже порога 8.5 - высокий риск
**Regime Warning:** 🔴 ПРОТИВ ТРЕНДА: BTC в сильном uptrend, SHORT рискован
---
### 2. XRP/USDT - 🔴 High Risk [Rank #2]
**Score:** 4.5/10 | **Probability:** 51% | **R:R:** 1:1.8
**Tier:** High Risk 🔴
**Recommendation:** ⚠️ ВЫСОКИЙ РИСК - не рекомендуется или paper trade
**Warning:** ⚠️⚠️ Score 4.5 ЗНАЧИТЕЛЬНО ниже порога 8.5
**Regime Warning:** 🔴 ПРОТИВ ТРЕНДА: BTC в сильном uptrend, SHORT очень рискован
---
### 3. LTC/USDT - 🔴 High Risk [Rank #3]
**Score:** 4.2/10 | **Probability:** 49% | **R:R:** 1:1.9
**Tier:** High Risk 🔴
**Recommendation:** 🔴 НЕ РЕКОМЕНДУЕТСЯ - Низкая confluence
**Warning:** ⚠️⚠️⚠️ Все факторы против тренда
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
## 💡 СРАВНЕНИЕ НАПРАВЛЕНИЙ
**LONG возможности:**
• Найдено: 45 возможностей
• Elite tier: 3
• Professional tier: 12
• Best score: 8.5/10 (ETH/USDT)
**SHORT возможности:**
• Найдено: 8 возможностей
• Elite tier: 0
• Professional tier: 0
• Best score: 5.8/10 (DOGE/USDT)
**Текущий вывод:**
LONG направление ЗНАЧИТЕЛЬНО сильнее в текущем Strong Bull режиме. SHORT возможности существуют, но крайне рискованны и против основного тренда. Рекомендуется фокусироваться на LONG позициях.
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
## ✅ ФИНАЛЬНАЯ РЕКОМЕНДАЦИЯ
**PRIMARY:** ETH/USDT LONG (Elite tier, 8.5/10) - ОТЛИЧНАЯ возможность
**SECONDARY:** SOL/USDT LONG (Professional tier, 7.2/10) - Хорошая возможность с уменьшенным размером
**AVOID:** Все SHORT позиции в текущем режиме (против тренда, высокий риск)
**Следующее обновление:** Через 12 часов
```
---
## 🎯 IMPLEMENTATION ROADMAP
### Phase 1: Core Architecture (Week 1)
- [ ] Implement TierClassifier
- [ ] Implement AdaptiveThresholds
- [ ] Implement RegimeDetector
- [ ] Implement SmartDisplay
- [ ] Remove all hard filters from market_scanner
- [ ] Update scoring normalization (immediate, not late)
### Phase 2: Integration & Testing (Week 2)
- [ ] Integrate new modules into market_scanner
- [ ] Update autonomous_analyzer to use new logic
- [ ] Update detailed_formatter for new report structure
- [ ] Test with real market data
- [ ] Verify BOTH directions always shown
- [ ] Performance testing
### Phase 3: ML Enhancement (Week 3)
- [ ] Implement MLProbabilityPredictor
- [ ] Create training pipeline from SignalTracker
- [ ] Train initial model with historical data
- [ ] Integrate ML probability into scoring
- [ ] A/B test ML vs static probability
### Phase 4: Monitoring & Metrics (Week 4)
- [ ] Build performance dashboard
- [ ] Track signal quality metrics
- [ ] Implement continuous learning loop
- [ ] Add performance alerts
- [ ] Documentation updates
---
## 📈 SUCCESS METRICS
### System Performance Targets
**Signal Quality:**
- ✅ Elite tier: 75%+ win rate, 2.5+ avg R:R
- ✅ Professional tier: 65%+ win rate, 2.0+ avg R:R
- ✅ Overall EV: Positive across all complete tasks
- ✅ No empty reports: 100% uptime
**User Experience:**
- ✅ ALWAYS shows opportunities (both directions)
- ✅ Clear tier communication (Elite/Professional/Speculative)
- ✅ Actionable insights every run
- ✅ Response time <30 seconds
**Market Coverage:**
- ✅ Finds opportunities in ANY regime (bull/bear/sideways)
- ✅ Adaptive to changing conditions
- ✅ 80%+ of tradeable assets covered
---
## 🔄 MIGRATION STRATEGY
### Step 1: Backup Current System
```bash
git commit -am "Backup before institutional upgrade"
git tag v2.0-backup
```
### Step 2: Create New Modules
- Create all new files in `mcp_server/`
- Don't modify existing files yet
### Step 3: Test New Modules Independently
- Unit tests for TierClassifier
- Unit tests for RegimeDetector
- Unit tests for AdaptiveThresholds
### Step 4: Gradual Integration
- Start with read-only integration (log but don't use)
- Compare old vs new results
- Validate performance
- Switch to new system when confident
### Step 5: Monitoring Period
- Run both systems in parallel for 1 week
- Track performance metrics
- Compare signal quality
- Make adjustments
### Step 6: Full Cutover
- Disable old system
- Enable new system exclusively
- Monitor closely for 48 hours
---
## 🚨 CRITICAL REMINDERS
1. **NEVER return empty results**
- ALWAYS show TOP-3 LONG and TOP-3 SHORT
- Add warnings if quality is low
- Educate user on why opportunities are limited
2. **ALWAYS normalize scores early**
- Don't waste resources on un-normalized data
- Consistent 0-10 scale throughout
3. **ALWAYS show both directions**
- CRITICAL_REQUIREMENTS.md compliance
- User needs full market perspective
4. **ALWAYS classify by tier**
- Clear Elite/Professional/Speculative/High Risk
- Never ambiguous "maybe" recommendations
5. **ALWAYS adapt to market regime**
- Bull market ≠ bear market
- Dynamic thresholds based on conditions
---
## 📚 APPENDIX: Code Templates
All essential code provided above in sections 1-5.
Additional helper functions and utilities available in:
- `mcp_server/score_normalizer.py` (existing)
- `mcp_server/signal_tracker.py` (existing)
- `mcp_server/validation_engine.py` (existing)
---
## ✅ FINAL CHECKLIST
Before deployment, verify:
- [ ] All hard filters removed
- [ ] Tier classification implemented
- [ ] Adaptive thresholds working
- [ ] Regime detection accurate
- [ ] Both directions ALWAYS shown
- [ ] Smart display logic tested
- [ ] ML probability integrated (optional)
- [ ] Performance metrics tracking
- [ ] Documentation updated
- [ ] Tests passing
- [ ] User feedback collected
---
**Version:** 3.0 INSTITUTIONAL
**Status:** READY FOR IMPLEMENTATION
**Expected Impact:** Transform to institutional-grade with 70%+ win rate
*"Excellence is not a destination, it's a continuous journey. This system will never return empty - it will always guide the trader with clarity and confidence."*