crypto_predictor_server.py•74.2 kB
#!/usr/bin/env python3
"""
Crypto Price Prediction MCP Server
This server provides tools for predicting cryptocurrency price movements
using historical data from Binance API.
"""
import json
import logging
from datetime import datetime, timedelta, timezone
import pytz
from typing import Any, Dict, List, Optional
import time
import httpx
import numpy as np
import pandas as pd
from mcp.server.fastmcp import FastMCP
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, ExtraTreesClassifier, VotingClassifier
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.model_selection import cross_val_score, train_test_split, TimeSeriesSplit
from sklearn.metrics import accuracy_score, classification_report
from sklearn.metrics import accuracy_score, classification_report
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
import talib
import re
from urllib.parse import quote
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastMCP server
mcp = FastMCP("Crypto Predictor")
class CryptoPredictionService:
def __init__(self):
self.base_url = "https://api.binance.com"
self.client = httpx.AsyncClient(timeout=30.0)
self.models = {}
self.scalers = {}
self.cache = {}
self.cache_ttl = 300 # 5 minutes
self.rate_limit = 1200 # Binance limit per minute
self.request_timestamps = []
self.news_cache = {}
self.news_cache_ttl = 900 # 15 minutes for news
def _get_prediction_time_window(self, interval: str, current_hour_timestamp: int) -> Dict[str, str]:
"""Calculate exact prediction time window in ET timezone"""
try:
# Convert timestamp to datetime
dt = datetime.fromtimestamp(current_hour_timestamp / 1000, tz=timezone.utc)
# Convert to ET timezone
et_tz = pytz.timezone('US/Eastern')
dt_et = dt.astimezone(et_tz)
# Calculate interval duration
interval_minutes = {
'1m': 1, '3m': 3, '5m': 5, '15m': 15, '30m': 30,
'1h': 60, '2h': 120, '4h': 240
}.get(interval, 60)
# Calculate end time
end_dt = dt_et + timedelta(minutes=interval_minutes)
# Format times
start_time = dt_et.strftime("%B %d, %I:%M%p ET").replace(" 0", " ")
end_time = end_dt.strftime("%I:%M%p ET").replace(" 0", " ")
current_time = datetime.now(et_tz).strftime("%B %d, %I:%M:%S%p ET").replace(" 0", " ")
# Create prediction window description
if interval_minutes == 60:
prediction_window = f"{dt_et.strftime('%B %d, %I-%p').replace(' 0', ' ')}{end_dt.strftime('%I%p ET').replace(' 0', ' ')}"
else:
prediction_window = f"{start_time} - {end_time}"
return {
"prediction_window": prediction_window,
"start_time": start_time,
"end_time": end_time,
"current_time": current_time,
"interval_minutes": interval_minutes
}
except Exception as e:
# Fallback to basic format
return {
"prediction_window": f"Current {interval} period",
"start_time": "Unknown",
"end_time": "Unknown",
"current_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"interval_minutes": 60
}
def _validate_symbol(self, symbol: str) -> str:
"""Validate and format trading pair symbol"""
symbol = symbol.upper().strip()
if not symbol.endswith('USDT') and not symbol.endswith('BTC') and not symbol.endswith('ETH'):
if 'USDT' not in symbol and 'BTC' not in symbol:
symbol += 'USDT'
return symbol
def _validate_interval(self, interval: str) -> str:
"""Validate time interval"""
valid_intervals = ['1m', '3m', '5m', '15m', '30m', '1h', '2h', '4h', '6h', '8h', '12h', '1d', '3d', '1w', '1M']
if interval not in valid_intervals:
raise ValueError(f"Invalid interval. Must be one of: {', '.join(valid_intervals)}")
return interval
def _check_rate_limit(self):
"""Check if we're within rate limits"""
now = time.time()
# Remove timestamps older than 1 minute
self.request_timestamps = [ts for ts in self.request_timestamps if now - ts < 60]
if len(self.request_timestamps) >= self.rate_limit:
raise Exception("Rate limit exceeded. Please wait before making more requests.")
self.request_timestamps.append(now)
def _get_cache_key(self, symbol: str, interval: str, limit: int) -> str:
"""Generate cache key"""
return f"{symbol}_{interval}_{limit}"
def _is_cache_valid(self, cache_key: str) -> bool:
"""Check if cached data is still valid"""
if cache_key not in self.cache:
return False
return time.time() - self.cache[cache_key]['timestamp'] < self.cache_ttl
async def get_historical_data(self, symbol: str, interval: str, limit: int = 100) -> List[Dict]:
"""Fetch historical kline data from Binance with caching and rate limiting"""
try:
# Validate inputs
symbol = self._validate_symbol(symbol)
interval = self._validate_interval(interval)
# Check cache first
cache_key = self._get_cache_key(symbol, interval, limit)
if self._is_cache_valid(cache_key):
return self.cache[cache_key]['data']
# Check rate limit
self._check_rate_limit()
url = f"{self.base_url}/api/v3/klines"
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
response = await self.client.get(url, params=params)
response.raise_for_status()
data = response.json()
if not data:
raise ValueError(f"No data returned for {symbol}")
# Convert to structured format
klines = []
for item in data:
try:
klines.append({
"open_time": int(item[0]),
"open": float(item[1]),
"high": float(item[2]),
"low": float(item[3]),
"close": float(item[4]),
"volume": float(item[5]),
"close_time": int(item[6]),
"quote_volume": float(item[7]),
"count": int(item[8]),
"taker_buy_base": float(item[9]),
"taker_buy_quote": float(item[10])
})
except (ValueError, IndexError) as e:
logger.warning(f"Skipping invalid data point: {e}")
continue
# Cache the results
self.cache[cache_key] = {
'data': klines,
'timestamp': time.time()
}
return klines
except httpx.HTTPStatusError as e:
if e.response.status_code == 400:
raise ValueError(f"Invalid symbol or parameters: {symbol}")
elif e.response.status_code == 429:
raise Exception("Rate limit exceeded by Binance. Please wait.")
else:
raise Exception(f"API error {e.response.status_code}: {e.response.text}")
except Exception as e:
logger.error(f"Error fetching data for {symbol}: {e}")
raise
def _is_period_incomplete(self, period_data: Dict) -> bool:
"""Check if a trading period is incomplete based on its close_time vs current time"""
close_time = period_data['close_time']
current_time = time.time() * 1000 # Convert to milliseconds
# A period is incomplete if its close_time is in the future
return close_time > current_time
def _filter_complete_periods(self, data: List[Dict]) -> List[Dict]:
"""Filter out incomplete periods from the data"""
if not data:
return data
# Check if the last period is incomplete
if self._is_period_incomplete(data[-1]):
return data[:-1] # Remove incomplete period
else:
return data # All periods are complete
def _ensure_json_serializable(self, df: pd.DataFrame) -> pd.DataFrame:
"""Ensure all values in dataframe are JSON serializable by converting numpy types to Python types"""
for col in df.columns:
if df[col].dtype == 'bool':
df[col] = df[col].astype(bool) # Convert numpy bool to python bool
elif df[col].dtype in ['int8', 'int16', 'int32', 'int64']:
df[col] = df[col].astype(int) # Convert numpy int to python int
elif df[col].dtype in ['float16', 'float32', 'float64']:
df[col] = df[col].astype(float) # Convert numpy float to python float
return df
def create_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Create comprehensive technical indicators and features for prediction"""
try:
# Convert to numpy arrays for TA-Lib
high = df['high'].values
low = df['low'].values
close = df['close'].values
volume = df['volume'].values
# Price-based features
df['price_change'] = df['close'].pct_change()
df['volatility'] = df['price_change'].rolling(window=10).std()
df['price_acceleration'] = df['price_change'].diff()
# Enhanced Moving Averages with more periods
df['sma_5'] = df['close'].rolling(window=5).mean()
df['sma_10'] = df['close'].rolling(window=10).mean()
df['sma_20'] = df['close'].rolling(window=20).mean()
df['sma_50'] = df['close'].rolling(window=50).mean() if len(df) >= 50 else df['close'].rolling(window=min(len(df), 20)).mean()
df['ema_5'] = df['close'].ewm(span=5).mean()
df['ema_12'] = df['close'].ewm(span=12).mean()
df['ema_26'] = df['close'].ewm(span=26).mean()
df['ema_50'] = df['close'].ewm(span=50).mean()
# Moving average convergence/divergence signals (add small epsilon to prevent division by zero)
df['sma_5_20_ratio'] = df['sma_5'] / (df['sma_20'] + 1e-8)
df['ema_12_26_ratio'] = df['ema_12'] / (df['ema_26'] + 1e-8)
df['price_vs_sma20'] = df['close'] / (df['sma_20'] + 1e-8)
df['price_vs_ema50'] = df['close'] / (df['ema_50'] + 1e-8)
# MACD
df['macd'] = df['ema_12'] - df['ema_26']
df['macd_signal'] = df['macd'].ewm(span=9).mean()
df['macd_histogram'] = df['macd'] - df['macd_signal']
# RSI with multiple periods
try:
df['rsi_14'] = pd.Series(talib.RSI(close, timeperiod=14), index=df.index)
df['rsi_7'] = pd.Series(talib.RSI(close, timeperiod=7), index=df.index)
except:
# Fallback calculation for RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / (loss + 1e-8) # Prevent division by zero
df['rsi_14'] = 100 - (100 / (1 + rs))
# Calculate RSI-7 separately
gain_7 = (delta.where(delta > 0, 0)).rolling(window=7).mean()
loss_7 = (-delta.where(delta < 0, 0)).rolling(window=7).mean()
rs_7 = gain_7 / (loss_7 + 1e-8)
df['rsi_7'] = 100 - (100 / (1 + rs_7))
# Bollinger Bands
bb_period = 20
bb_std = 2
df['bb_middle'] = df['close'].rolling(window=bb_period).mean()
bb_std_dev = df['close'].rolling(window=bb_period).std()
df['bb_upper'] = df['bb_middle'] + (bb_std_dev * bb_std)
df['bb_lower'] = df['bb_middle'] - (bb_std_dev * bb_std)
df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / (df['bb_middle'] + 1e-8)
df['bb_position'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'] + 1e-8)
# Stochastic Oscillator
try:
stoch_k, stoch_d = talib.STOCH(high, low, close)
df['stoch_k'] = pd.Series(stoch_k, index=df.index)
df['stoch_d'] = pd.Series(stoch_d, index=df.index)
except:
# Fallback calculation
lowest_low = df['low'].rolling(window=14).min()
highest_high = df['high'].rolling(window=14).max()
df['stoch_k'] = 100 * (df['close'] - lowest_low) / (highest_high - lowest_low + 1e-8)
df['stoch_d'] = df['stoch_k'].rolling(window=3).mean()
# Williams %R
try:
df['williams_r'] = pd.Series(talib.WILLR(high, low, close), index=df.index)
except:
highest_high = df['high'].rolling(window=14).max()
lowest_low = df['low'].rolling(window=14).min()
df['williams_r'] = -100 * (highest_high - df['close']) / (highest_high - lowest_low + 1e-8)
# Average True Range (ATR)
try:
df['atr'] = pd.Series(talib.ATR(high, low, close), index=df.index)
except:
df['tr1'] = df['high'] - df['low']
df['tr2'] = abs(df['high'] - df['close'].shift(1))
df['tr3'] = abs(df['low'] - df['close'].shift(1))
df['atr'] = df[['tr1', 'tr2', 'tr3']].max(axis=1).rolling(window=14).mean()
df.drop(['tr1', 'tr2', 'tr3'], axis=1, inplace=True)
# Volume indicators
df['volume_sma'] = df['volume'].rolling(window=20).mean()
df['volume_ratio'] = df['volume'] / (df['volume_sma'] + 1e-8)
df['volume_rate_of_change'] = df['volume'].pct_change()
# On-Balance Volume (OBV)
df['obv'] = (df['volume'] * df['close'].diff().apply(lambda x: 1 if x > 0 else -1 if x < 0 else 0)).cumsum()
df['obv_sma'] = df['obv'].rolling(window=10).mean()
# Price patterns
df['high_low_ratio'] = df['high'] / (df['low'] + 1e-8)
df['close_position'] = (df['close'] - df['low']) / (df['high'] - df['low'] + 1e-8)
df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']
df['body_size'] = abs(df['close'] - df['open']) / (df['close'] + 1e-8)
# Momentum indicators
df['momentum_10'] = df['close'] / (df['close'].shift(10) + 1e-8)
df['rate_of_change'] = df['close'].pct_change(periods=10)
# Support and Resistance levels
df['resistance_level'] = df['high'].rolling(window=20).max()
df['support_level'] = df['low'].rolling(window=20).min()
df['distance_to_resistance'] = (df['resistance_level'] - df['close']) / (df['close'] + 1e-8)
df['distance_to_support'] = (df['close'] - df['support_level']) / (df['close'] + 1e-8)
# Enhanced Market structure and trend analysis
df['higher_high'] = (df['high'] > df['high'].shift(1)).astype(int)
df['lower_low'] = (df['low'] < df['low'].shift(1)).astype(int)
df['trend_strength'] = df['higher_high'].rolling(window=5).sum() - df['lower_low'].rolling(window=5).sum()
# Fractal patterns removed due to look-ahead bias
# Price momentum across multiple timeframes
df['momentum_3'] = df['close'] / (df['close'].shift(3) + 1e-8) - 1
df['momentum_5'] = df['close'] / (df['close'].shift(5) + 1e-8) - 1
if len(df) >= 20:
df['momentum_20'] = df['close'] / (df['close'].shift(20) + 1e-8) - 1
else:
df['momentum_20'] = 0
# Velocity and acceleration
df['price_velocity'] = df['close'].diff().rolling(window=3).mean()
df['price_acceleration'] = df['price_velocity'].diff()
# Volatility-adjusted metrics
df['sharpe_ratio'] = df['price_change'].rolling(window=20).mean() / (df['volatility'] + 1e-8)
df['volatility_ratio'] = df['volatility'] / (df['volatility'].rolling(window=20).mean() + 1e-8)
# Enhanced Market regime indicators
df['volatility_regime'] = (df['volatility'] > df['volatility'].rolling(window=50).quantile(0.7)).astype(int)
df['trend_regime'] = (abs(df['trend_strength']) > 2).astype(int)
df['bull_market'] = (df['close'] > df['sma_50']).astype(int) if len(df) >= 50 else (df['close'] > df['sma_20']).astype(int)
df['volatility_breakout'] = (df['volatility'] > df['volatility'].rolling(20).quantile(0.8)).astype(int)
# Price action patterns
df['doji'] = ((abs(df['open'] - df['close']) / (df['high'] - df['low'] + 1e-8)) < 0.1).astype(int)
df['hammer'] = ((df['close'] > df['open']) & (df['lower_shadow'] > 2 * df['body_size'])).astype(int)
df['shooting_star'] = ((df['close'] < df['open']) & (df['upper_shadow'] > 2 * df['body_size'])).astype(int)
# Trend alignment features
df['trend_alignment'] = (df['ema_12'] > df['ema_26']).astype(int)
df['ma_alignment'] = ((df['sma_5'] > df['sma_10']) & (df['sma_10'] > df['sma_20'])).astype(int)
# Enhanced momentum features
df['momentum_acceleration'] = df['momentum_5'].diff()
df['rsi_momentum'] = df['rsi_14'].diff()
df['volume_momentum'] = df['volume_ratio'].rolling(5).mean()
# Time-based features using cyclical encoding
import numpy as np
timestamps = pd.to_datetime(df.index)
df['hour'] = timestamps.hour
df['day_of_week'] = timestamps.dayofweek
df['month'] = timestamps.month
# Cyclical encoding for time features (preserves time relationships)
df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
df['dow_sin'] = np.sin(2 * np.pi * df['day_of_week'] / 7)
df['dow_cos'] = np.cos(2 * np.pi * df['day_of_week'] / 7)
df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12)
df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12)
# Drop raw time columns as we're using cyclical encoding
df = df.drop(['hour', 'day_of_week', 'month'], axis=1)
# Target: 1 if next period close is higher than current period close, 0 if lower
# This predicts price direction change from current period to next period
df['target'] = (df['close'].shift(-1) > df['close']).astype(int)
# Normalize all features for consistent scaling
df = self._normalize_features(df)
# Ensure all values are JSON serializable
df = self._ensure_json_serializable(df)
return df
except Exception as e:
logger.error(f"Error creating features: {e}")
# Fallback to basic features
df['price_change'] = df['close'].pct_change()
df['sma_20'] = df['close'].rolling(window=20).mean()
df['target'] = (df['close'].shift(-1) > df['close']).astype(int)
df = self._normalize_features(df)
df = self._ensure_json_serializable(df)
return df
def _normalize_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Normalize all technical indicators and features for consistent scaling"""
try:
# Define feature categories for different normalization strategies
# Already normalized features (0-100 scale or ratios)
already_normalized = [
'rsi_14', 'rsi_7', 'stoch_k', 'stoch_d', 'williams_r',
'bb_position', 'close_position', 'higher_high', 'lower_low',
'volatility_regime', 'trend_regime',
'target' # Don't normalize target
]
# Ratio features (should be standardized around 1.0)
ratio_features = [
'sma_5_20_ratio', 'ema_12_26_ratio', 'price_vs_sma20', 'price_vs_ema50',
'high_low_ratio', 'volume_ratio', 'momentum_10', 'volatility_ratio'
]
# Percentage change features (already in percentage form)
pct_features = [
'price_change', 'price_acceleration', 'volume_rate_of_change',
'momentum_3', 'momentum_5', 'momentum_20', 'rate_of_change',
'distance_to_resistance', 'distance_to_support'
]
# Price-based features (need robust scaling due to outliers)
price_features = [
'sma_5', 'sma_10', 'sma_20', 'sma_50', 'ema_5', 'ema_12', 'ema_26', 'ema_50',
'bb_middle', 'bb_upper', 'bb_lower', 'resistance_level', 'support_level',
'upper_shadow', 'lower_shadow', 'price_velocity'
]
# Volume and OBV features (log transformation + standardization)
volume_features = [
'volume_sma', 'obv', 'obv_sma'
]
# Technical indicators (standardization)
technical_features = [
'macd', 'macd_signal', 'macd_histogram', 'volatility', 'bb_width',
'atr', 'body_size', 'trend_strength', 'sharpe_ratio'
]
# Create a copy to avoid modifying original
df_normalized = df.copy()
# 1. Handle infinite values and extreme outliers
for col in df_normalized.columns:
if col not in already_normalized and df_normalized[col].dtype in ['float64', 'int64']:
# Replace infinite values with NaN
df_normalized[col] = df_normalized[col].replace([np.inf, -np.inf], np.nan)
# Cap extreme outliers at 99.5th percentile
if not df_normalized[col].isna().all():
upper_bound = df_normalized[col].quantile(0.995)
lower_bound = df_normalized[col].quantile(0.005)
df_normalized[col] = np.clip(df_normalized[col], lower_bound, upper_bound)
# 2. Normalize ratio features (center around 1.0, scale by std)
for feature in ratio_features:
if feature in df_normalized.columns and not df_normalized[feature].isna().all():
# Log transform ratios to handle multiplicative relationships
df_normalized[f'{feature}_log'] = np.log(df_normalized[feature] + 1e-8)
# Remove original ratio feature to avoid redundancy
df_normalized.drop(feature, axis=1, inplace=True)
# 3. Standardize percentage features (z-score normalization)
for feature in pct_features:
if feature in df_normalized.columns and not df_normalized[feature].isna().all():
mean_val = df_normalized[feature].mean()
std_val = df_normalized[feature].std()
if std_val > 0:
df_normalized[feature] = (df_normalized[feature] - mean_val) / std_val
# 4. Robust scaling for price features (less sensitive to outliers)
for feature in price_features:
if feature in df_normalized.columns and not df_normalized[feature].isna().all():
median_val = df_normalized[feature].median()
mad = np.median(np.abs(df_normalized[feature] - median_val))
if mad > 0:
df_normalized[feature] = (df_normalized[feature] - median_val) / (1.4826 * mad)
# 5. Log transform + standardize volume features
for feature in volume_features:
if feature in df_normalized.columns and not df_normalized[feature].isna().all():
# Log transform (add small constant to handle zeros)
min_positive = df_normalized[feature][df_normalized[feature] > 0].min()
log_feature = np.log(df_normalized[feature] + min_positive / 10)
# Standardize the log values
mean_val = log_feature.mean()
std_val = log_feature.std()
if std_val > 0:
df_normalized[feature] = (log_feature - mean_val) / std_val
# 6. Standard scaling for technical indicators
for feature in technical_features:
if feature in df_normalized.columns and not df_normalized[feature].isna().all():
mean_val = df_normalized[feature].mean()
std_val = df_normalized[feature].std()
if std_val > 0:
df_normalized[feature] = (df_normalized[feature] - mean_val) / std_val
# 7. Min-max scaling for RSI and oscillators (keep in 0-1 range)
oscillator_features = ['rsi_14', 'rsi_7', 'stoch_k', 'stoch_d']
for feature in oscillator_features:
if feature in df_normalized.columns and not df_normalized[feature].isna().all():
df_normalized[feature] = df_normalized[feature] / 100.0 # Convert to 0-1 scale
# 8. Handle Williams %R (convert from -100,0 to 0,1 scale)
if 'williams_r' in df_normalized.columns:
df_normalized['williams_r'] = (df_normalized['williams_r'] + 100) / 100.0
# 9. Fill remaining NaN values with rolling median (conservative approach)
for col in df_normalized.columns:
if col not in already_normalized and df_normalized[col].dtype in ['float64', 'int64']:
# Use rolling median with expanding window for early values
rolling_median = df_normalized[col].expanding().median()
df_normalized[col] = df_normalized[col].fillna(rolling_median)
# If still NaN (all NaN column), fill with 0
df_normalized[col] = df_normalized[col].fillna(0)
# 10. Ensure no features exceed reasonable bounds after normalization
# Exclude price data columns from clipping
price_columns = ['open', 'high', 'low', 'close', 'volume', 'open_time', 'close_time', 'quote_volume', 'count', 'taker_buy_base', 'taker_buy_quote']
for col in df_normalized.columns:
if col not in already_normalized and col not in price_columns and df_normalized[col].dtype in ['float64', 'int64']:
# Cap normalized values at ±5 standard deviations
df_normalized[col] = np.clip(df_normalized[col], -5, 5)
logger.info(f"Normalized {len(df_normalized.columns)} features successfully")
return df_normalized
except Exception as e:
logger.error(f"Error normalizing features: {e}")
# Return original dataframe if normalization fails
return df
def prepare_training_data(self, df: pd.DataFrame) -> tuple:
"""Prepare features and target for model training with comprehensive feature set"""
# Filter to only include columns that exist in the dataframe
available_features = [col for col in FEATURE_COLUMNS if col in df.columns]
if len(available_features) < 5:
raise ValueError(f"Not enough features available. Found: {available_features}")
# Remove rows with NaN values (last row will have NaN target automatically)
df_clean = df[available_features + ['target']].dropna()
if len(df_clean) < 100:
logger.warning(f"Limited training data: {len(df_clean)} samples")
if len(df_clean) < 50:
raise ValueError(f"Insufficient clean data for training: {len(df_clean)} samples")
X = df_clean[available_features].values
y = df_clean['target'].values
# Feature importance logging
feature_names = available_features
logger.info(f"Training with {len(feature_names)} features: {', '.join(feature_names[:10])}...")
return X, y, feature_names
async def train_model(self, symbol: str, interval: str = "1h", limit: int = 1000, model_type: str = "ensemble"):
"""Train enhanced prediction model with cross-validation"""
try:
symbol = self._validate_symbol(symbol)
interval = self._validate_interval(interval)
# Get historical data (request extra to account for incomplete current period)
data = await self.get_historical_data(symbol, interval, limit + 1)
if len(data) < 100:
raise ValueError(f"Insufficient data: {len(data)} samples")
# Filter out incomplete periods from training data
training_data = self._filter_complete_periods(data)
if len(training_data) < 100:
raise ValueError(f"Insufficient training data after removing current period: {len(training_data)} samples")
df = pd.DataFrame(training_data)
# Create features
df = self.create_features(df)
# Prepare training data
X, y, feature_names = self.prepare_training_data(df)
if len(np.unique(y)) < 2:
raise ValueError("Insufficient class diversity in target variable")
# Use ALL features - no feature selection (better accuracy based on A/B testing)
selected_feature_names = feature_names
X_selected = X
selector = None
high_corr_features = [] # No features removed
# Robust scaling for better handling of outliers
scaler = RobustScaler()
X_scaled = scaler.fit_transform(X_selected)
# Simplified ensemble with diverse algorithms
models = {}
# Enhanced Gradient Boosting with improved parameters
gb_model = GradientBoostingClassifier(
n_estimators=200, # Increased from 100 for better pattern learning
learning_rate=0.05, # Reduced from 0.1 for better generalization
max_depth=4, # Reduced from 6 to prevent overfitting
subsample=0.9, # Increased from 0.8 to use more data
min_samples_split=10, # Added constraint to prevent overfitting
min_samples_leaf=5, # Added constraint for smoother predictions
random_state=42,
validation_fraction=0.1,
n_iter_no_change=15 # Increased patience
)
# Improved SVM with tuned parameters
svm_model = SVC(
kernel='rbf',
C=0.5, # Reduced from 1.0 for less aggressive fitting
gamma='auto', # Changed from 'scale' for different kernel shape
class_weight='balanced',
random_state=42,
probability=True
)
# Add Random Forest for model diversity
rf_model = RandomForestClassifier(
n_estimators=150,
max_depth=8,
min_samples_split=5,
min_samples_leaf=2,
bootstrap=True,
random_state=42,
n_jobs=-1 # Use all cores
)
# Train models with time series cross-validation
tscv = TimeSeriesSplit(n_splits=5)
gb_model.fit(X_scaled, y)
svm_model.fit(X_scaled, y)
rf_model.fit(X_scaled, y)
models['gradient_boosting'] = gb_model
models['svm'] = svm_model
models['random_forest'] = rf_model
# Time series cross-validation scores
gb_cv_scores = cross_val_score(gb_model, X_scaled, y, cv=tscv, scoring='accuracy')
svm_cv_scores = cross_val_score(svm_model, X_scaled, y, cv=tscv, scoring='accuracy')
rf_cv_scores = cross_val_score(rf_model, X_scaled, y, cv=tscv, scoring='accuracy')
# Calculate dynamic weights based on CV performance for 3 models
gb_mean_score = np.mean(gb_cv_scores)
svm_mean_score = np.mean(svm_cv_scores)
rf_mean_score = np.mean(rf_cv_scores)
# Normalize scores to create weights (higher performing model gets higher weight)
total_score = gb_mean_score + svm_mean_score + rf_mean_score
if total_score > 0:
gb_weight = gb_mean_score / total_score
svm_weight = svm_mean_score / total_score
rf_weight = rf_mean_score / total_score
else:
# Fallback to equal weights if all models perform poorly
gb_weight, svm_weight, rf_weight = 1/3, 1/3, 1/3
# Apply minimum weight threshold to prevent any model from dominating completely
min_weight = 0.2 # Reduced for 3 models
weights = [gb_weight, svm_weight, rf_weight]
# Ensure no weight falls below minimum
for i, weight in enumerate(weights):
if weight < min_weight:
weights[i] = min_weight
# Renormalize weights to sum to 1
weight_sum = sum(weights)
gb_weight, svm_weight, rf_weight = [w/weight_sum for w in weights]
# Create meta-ensemble (voting classifier) with dynamic weights
dynamic_weights = [gb_weight, svm_weight, rf_weight]
voting_clf = VotingClassifier(
estimators=[
('gb', gb_model),
('svm', svm_model),
('rf', rf_model)
],
voting='soft',
weights=dynamic_weights # Dynamic weights based on CV performance
)
voting_clf.fit(X_scaled, y)
models['ensemble'] = voting_clf
# Feature importance from gradient boosting model
feature_importance = dict(zip(selected_feature_names, gb_model.feature_importances_))
top_features = sorted(feature_importance.items(), key=lambda x: x[1], reverse=True)[:10]
# Store enhanced model info
self.models[symbol] = {
'models': models,
'feature_names': selected_feature_names,
'original_feature_names': feature_names,
'feature_selector': selector,
'feature_importance': feature_importance,
'model_type': model_type,
'interval': interval,
'cv_scores': {
'gradient_boosting': gb_cv_scores,
'svm': svm_cv_scores,
'random_forest': rf_cv_scores
},
'removed_features': high_corr_features
}
self.scalers[symbol] = scaler
return {
"symbol": symbol,
"gradient_boosting_accuracy": float(np.mean(gb_cv_scores)),
"svm_accuracy": float(np.mean(svm_cv_scores)),
"random_forest_accuracy": float(np.mean(rf_cv_scores)),
"ensemble_performance": {
"gb_std": float(np.std(gb_cv_scores)),
"svm_std": float(np.std(svm_cv_scores)),
"rf_std": float(np.std(rf_cv_scores)),
"dynamic_weights": {
"gradient_boosting": float(gb_weight),
"svm": float(svm_weight),
"random_forest": float(rf_weight)
}
},
"samples": len(X),
"features": len(selected_feature_names),
"removed_correlated_features": 0,
"top_features": [f"{name}: {importance:.4f}" for name, importance in top_features],
"class_distribution": {f"class_{i}": int(np.sum(y == i)) for i in np.unique(y)},
"model_trained": True,
"training_period": f"{len(data)} {interval} candles",
"selected_features": selected_feature_names[:10]
}
except Exception as e:
logger.error(f"Error training model for {symbol}: {e}")
raise
async def predict_current_hour(self, symbol: str, interval: str = "1h", training_periods: int = 100) -> Dict[str, Any]:
"""Predict if current hour will close higher or lower than it opened
Args:
symbol: Trading pair symbol
interval: Time interval for prediction
training_periods: Number of historical periods to use for training (default: 100)
"""
try:
symbol = self._validate_symbol(symbol)
interval = self._validate_interval(interval)
# Check if model exists and was trained for the same interval
if symbol not in self.models or self.models[symbol].get('interval') != interval:
await self.train_model(symbol, interval, training_periods)
# Get recent data - use same amount as training for consistency
data = await self.get_historical_data(symbol, interval, training_periods)
if len(data) < 50:
raise ValueError(f"Insufficient recent data: {len(data)} samples")
# Get current period information (use latest period whether complete or incomplete)
current_period_data = data[-1] # Latest period
is_current_incomplete = self._is_period_incomplete(current_period_data)
df = pd.DataFrame(self._filter_complete_periods(data))
current_period_open = float(current_period_data['open'])
current_price = float(current_period_data['close']) # Current price
# Calculate time window information
time_info = self._get_prediction_time_window(interval, current_period_data['open_time'])
# Create features
df = self.create_features(df)
# Get model info
model_info = self.models[symbol]
feature_names = model_info['feature_names']
models = model_info['models']
# Get latest complete features - use all features (no selection)
try:
# Use the original feature names from training to maintain consistency
original_feature_names = model_info['original_feature_names']
available_features = [col for col in original_feature_names if col in df.columns]
# Use the last complete row with all features (no filtering)
latest_features_selected = df[available_features].iloc[-1].values.reshape(1, -1)
except KeyError as e:
raise ValueError(f"Missing feature in prediction data: {e}")
if np.isnan(latest_features_selected).any():
raise ValueError("NaN values in latest features")
# Scale features using the same scaler as training
scaler = self.scalers[symbol]
latest_features_scaled = scaler.transform(latest_features_selected)
# Enhanced ensemble predictions with dynamic weighting
predictions = {}
probabilities = {}
stored_cv_scores = model_info.get('cv_scores', {})
# Calculate mean CV scores for individual models only
cv_scores = {}
for model_name in models.keys():
if model_name == 'ensemble':
# Calculate ensemble CV score as weighted average of individual model scores
individual_scores = []
individual_weights = []
# Get weights from the VotingClassifier (VotingClassifier stores weights in .weights attribute)
if hasattr(models['ensemble'], 'weights') and models['ensemble'].weights is not None:
voting_weights = models['ensemble'].weights
else:
# Fallback: try to get from training info or use equal weights
training_weights = model_info.get('ensemble_performance', {}).get('dynamic_weights', {})
if training_weights:
voting_weights = [
training_weights.get('gradient_boosting', 1/3),
training_weights.get('svm', 1/3),
training_weights.get('random_forest', 1/3)
]
else:
voting_weights = [1/3, 1/3, 1/3]
model_names = ['gradient_boosting', 'svm', 'random_forest']
for i, name in enumerate(model_names):
if name in stored_cv_scores:
individual_scores.append(np.mean(stored_cv_scores[name]))
individual_weights.append(voting_weights[i] if i < len(voting_weights) else 1/3)
if individual_scores:
# Weighted average of individual model CV scores
cv_scores[model_name] = np.average(individual_scores, weights=individual_weights)
else:
cv_scores[model_name] = 0.5 # Fallback
elif model_name in stored_cv_scores:
cv_scores[model_name] = np.mean(stored_cv_scores[model_name])
else:
cv_scores[model_name] = 0.5 # Default for missing scores
# Get predictions from all models
for model_name, model in models.items():
if model_name == 'ensemble': # Skip the meta-ensemble for individual predictions
continue
pred = model.predict(latest_features_scaled)[0]
prob = model.predict_proba(latest_features_scaled)[0]
predictions[model_name] = pred
probabilities[model_name] = prob
# Use the meta-ensemble if available, otherwise weighted ensemble
if 'ensemble' in models:
ensemble_pred = models['ensemble'].predict(latest_features_scaled)[0]
ensemble_prob = models['ensemble'].predict_proba(latest_features_scaled)[0]
ensemble_prob_up = float(ensemble_prob[1] if len(ensemble_prob) > 1 else 0.5)
ensemble_prob_down = 1 - ensemble_prob_up
ensemble_prediction = ensemble_pred
else:
# Fallback to weighted ensemble
total_score = sum(cv_scores.values())
weights = {name: score/total_score for name, score in cv_scores.items() if name != 'ensemble'}
ensemble_prob_up = sum(weights.get(name, 0) * (prob[1] if len(prob) > 1 else 0.5)
for name, prob in probabilities.items())
ensemble_prob_down = 1 - ensemble_prob_up
ensemble_prediction = 1 if ensemble_prob_up > 0.5 else 0
# Enhanced confidence scoring based on multiple factors
prob_diff = abs(ensemble_prob_up - ensemble_prob_down)
# Factor 1: Probability difference
conf_prob = min(prob_diff * 2, 1.0) # Normalize to 0-1
# Factor 2: Model agreement (how many models agree with ensemble)
agreement_count = sum(1 for pred in predictions.values() if pred == ensemble_prediction)
model_agreement = agreement_count / max(len(predictions), 1)
# Factor 3: CV score quality (higher average CV score = more confidence)
avg_cv_score = np.mean(list(cv_scores.values()))
cv_confidence = max(0, (avg_cv_score - 0.5) * 2) # Normalize 0.5-1 to 0-1
# Combined confidence score
combined_confidence = (conf_prob * 0.4 + model_agreement * 0.4 + cv_confidence * 0.2)
# Confidence levels with stricter thresholds
if combined_confidence > 0.7 and prob_diff > 0.2:
confidence_level = "HIGH"
elif combined_confidence > 0.5 and prob_diff > 0.1:
confidence_level = "MEDIUM"
else:
confidence_level = "LOW"
# current_price already set from current_hour_data above
price_change_24h = float(((current_price - df['close'].iloc[-25]) / df['close'].iloc[-25]) * 100) if len(df) > 25 else 0
# Market conditions - get from latest complete features row
latest_complete_row = df.iloc[-1] if len(df) > 0 else {}
market_conditions = {
"rsi_oversold": bool(float(latest_complete_row.get('rsi_14', 50)) < 30),
"rsi_overbought": bool(float(latest_complete_row.get('rsi_14', 50)) > 70),
"high_volatility": bool(float(latest_complete_row.get('volatility', 0)) > float(df['volatility'].quantile(0.8)) if len(df) > 0 else False),
"strong_volume": bool(float(latest_complete_row.get('volume_ratio', 1)) > 1.5),
"near_resistance": bool(float(latest_complete_row.get('distance_to_resistance', 0)) < 0.02),
"near_support": bool(float(latest_complete_row.get('distance_to_support', 0)) < 0.02)
}
return {
"symbol": symbol,
"current_price": current_price,
"period_open": current_period_open,
"period_change": float((current_price - current_period_open) / current_period_open * 100),
"price_change_24h": price_change_24h,
"prediction": "UP" if ensemble_prediction == 1 else "DOWN",
"prediction_window": time_info["prediction_window"],
"prediction_for_time": time_info["end_time"],
"prediction_target": f"Next period will close higher than current period close (${current_price:.2f}) by {time_info['end_time']}",
"current_time_et": time_info["current_time"],
"time_remaining": f"Prediction valid until {time_info['end_time']}",
"confidence": float(max(ensemble_prob_up, ensemble_prob_down)),
"confidence_level": confidence_level,
"probability_up": float(ensemble_prob_up),
"probability_down": float(ensemble_prob_down),
"model_predictions": {
name: "UP" if pred == 1 else "DOWN"
for name, pred in predictions.items()
},
"model_probabilities": {
name: {"up": float(prob[1] if len(prob) > 1 else 0.5), "down": float(prob[0])}
for name, prob in probabilities.items()
},
"market_conditions": market_conditions,
"timestamp": datetime.now().isoformat(),
"interval": interval,
"data_quality": {
"samples_used": len(data),
"features_count": len(feature_names),
"has_sufficient_data": len(data) >= 100
},
"model_weights": weights if 'weights' in locals() else {},
"cv_scores": cv_scores,
"confidence_breakdown": {
"probability_confidence": float(conf_prob),
"model_agreement": float(model_agreement),
"cv_confidence": float(cv_confidence),
"combined_confidence": float(combined_confidence),
"agreeing_models": agreement_count,
"total_models": len(predictions)
}
}
except Exception as e:
logger.error(f"Error predicting for {symbol}: {e}")
raise
def get_crypto_news_search_query(self, symbol: str = "bitcoin", limit: int = 5) -> Dict[str, Any]:
"""Generate optimized search query for crypto news using Claude Code's WebSearch"""
try:
# Normalize symbol for news queries
news_symbol = symbol.replace("USDT", "").replace("BTC", "").lower()
if news_symbol == "btc":
news_symbol = "bitcoin"
elif news_symbol == "eth":
news_symbol = "ethereum"
# Create optimized search query
search_query = f"{news_symbol} cryptocurrency news today price analysis market sentiment"
return {
"symbol": news_symbol,
"search_query": search_query,
"suggested_domains": ["coindesk.com", "cointelegraph.com", "decrypt.co", "bitcoinmagazine.com", "cryptonews.com"],
"search_instructions": f"Search for recent news about {news_symbol} and analyze market sentiment",
"analysis_prompt": f"Analyze the sentiment and market impact of recent {news_symbol} news. Focus on: 1) Overall sentiment (bullish/bearish/neutral), 2) Key events affecting price, 3) Market impact assessment, 4) Trading implications",
"timestamp": datetime.now().isoformat(),
"status": "ready_for_websearch",
"note": "Use this query with Claude Code's WebSearch tool for real-time news analysis"
}
except Exception as e:
logger.error(f"Error preparing crypto news search: {e}")
return {
"error": str(e),
"symbol": symbol,
"timestamp": datetime.now().isoformat()
}
def _get_multi_timeframe_trend(self, df_current: pd.DataFrame, current_interval: str) -> Dict[str, Any]:
"""Get trend context from current timeframe analysis"""
try:
# Calculate trend strength over multiple periods
short_trend = (df_current['close'].iloc[-5:].mean() - df_current['close'].iloc[-10:-5].mean()) / df_current['close'].iloc[-10:-5].mean()
medium_trend = (df_current['close'].iloc[-10:].mean() - df_current['close'].iloc[-20:-10].mean()) / df_current['close'].iloc[-20:-10].mean()
long_trend = (df_current['close'].iloc[-20:].mean() - df_current['close'].iloc[-40:-20].mean()) / df_current['close'].iloc[-40:-20].mean()
return {
"short_term_trend": "bullish" if short_trend > 0.01 else "bearish" if short_trend < -0.01 else "neutral",
"medium_term_trend": "bullish" if medium_trend > 0.02 else "bearish" if medium_trend < -0.02 else "neutral",
"long_term_trend": "bullish" if long_trend > 0.03 else "bearish" if long_trend < -0.03 else "neutral",
"trend_alignment": all([short_trend > 0, medium_trend > 0, long_trend > 0]) or all([short_trend < 0, medium_trend < 0, long_trend < 0]),
"trend_strength": abs(short_trend) + abs(medium_trend) + abs(long_trend)
}
except Exception:
return {
"short_term_trend": "neutral",
"medium_term_trend": "neutral",
"long_term_trend": "neutral",
"trend_alignment": False,
"trend_strength": 0
}
def _get_dynamic_thresholds(self, volatility: float) -> Dict[str, float]:
"""Calculate dynamic thresholds based on current volatility"""
# Normalize volatility (assume 2% is average)
vol_normalized = max(0.5, min(2.0, volatility / 0.02))
return {
"rsi_oversold": max(20, 30 * (2 - vol_normalized)), # More sensitive in low vol
"rsi_overbought": min(80, 70 * vol_normalized), # Less sensitive in high vol
"volume_breakout": 1.5 * vol_normalized, # Higher threshold in volatile markets
"momentum_threshold": 0.01 * vol_normalized # Volatility-adjusted momentum
}
def _detect_market_structure(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Analyze market structure and phase"""
try:
# Calculate price swings
highs = df['high'].rolling(5, center=True).max()
lows = df['low'].rolling(5, center=True).min()
# Detect ranging vs trending
price_range = (df['high'].rolling(20).max() - df['low'].rolling(20).min()).iloc[-1]
avg_true_range = df['atr'].iloc[-1] if 'atr' in df.columns else price_range * 0.1
range_efficiency = price_range / (20 * avg_true_range) if avg_true_range > 0 else 1
# Support/Resistance strength
current_price = df['close'].iloc[-1]
support_touches = sum(abs(df['low'].iloc[-20:] - df['support_level'].iloc[-1]) < avg_true_range)
resistance_touches = sum(abs(df['high'].iloc[-20:] - df['resistance_level'].iloc[-1]) < avg_true_range)
return {
"market_phase": "trending" if range_efficiency > 0.6 else "ranging",
"support_strength": min(10, support_touches),
"resistance_strength": min(10, resistance_touches),
"breakout_probability": max(0.1, min(0.9, (support_touches + resistance_touches) / 20)),
"range_efficiency": float(range_efficiency)
}
except Exception:
return {
"market_phase": "unknown",
"support_strength": 0,
"resistance_strength": 0,
"breakout_probability": 0.5,
"range_efficiency": 0.5
}
def _calculate_confluence_score(self, signals: Dict[str, bool], thresholds: Dict[str, float]) -> Dict[str, Any]:
"""Calculate confluence score based on multiple aligned signals"""
# Weight different signal types
signal_weights = {
'trend_signals': ['bullish_momentum', 'above_ma_20', 'trend_alignment'],
'momentum_signals': ['rsi_oversold', 'rsi_overbought', 'momentum_breakout'],
'volume_signals': ['high_volume', 'volume_breakout'],
'structure_signals': ['near_support', 'near_resistance', 'breakout_setup']
}
bullish_score = 0
bearish_score = 0
total_signals = 0
for category, signal_list in signal_weights.items():
for signal in signal_list:
if signal in signals:
total_signals += 1
if signal in ['rsi_oversold', 'near_support', 'bullish_momentum', 'above_ma_20', 'high_volume']:
if signals[signal]:
bullish_score += 1
elif signal in ['rsi_overbought', 'near_resistance', 'bearish_momentum']:
if signals[signal]:
bearish_score += 1
net_score = (bullish_score - bearish_score) / max(total_signals, 1)
return {
"confluence_score": float(net_score),
"bullish_signals": bullish_score,
"bearish_signals": bearish_score,
"signal_strength": "STRONG" if abs(net_score) > 0.6 else "MODERATE" if abs(net_score) > 0.3 else "WEAK",
"bias": "BULLISH" if net_score > 0.2 else "BEARISH" if net_score < -0.2 else "NEUTRAL"
}
def _get_session_context(self) -> Dict[str, str]:
"""Get current trading session context"""
try:
import datetime
utc_now = datetime.datetime.utcnow()
hour = utc_now.hour
# Trading sessions (UTC)
if 0 <= hour < 8:
return {"session": "Asian", "session_character": "Lower volatility, range-bound"}
elif 8 <= hour < 16:
return {"session": "European", "session_character": "Medium volatility, trending"}
else:
return {"session": "US", "session_character": "Higher volatility, breakouts"}
except Exception:
return {"session": "Unknown", "session_character": "Normal"}
async def fetch_trader_activities(self, trader_address: str, limit: int = 200) -> Dict[str, Any]:
"""Fetch Polymarket trader activities for analysis"""
try:
url = "https://data-api.polymarket.com/activity"
params = {
'user': trader_address,
'limit': limit,
'sortBy': 'TIMESTAMP',
'sortDirection': 'DESC'
}
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Accept': 'application/json',
}
response = await self.client.get(url, params=params, headers=headers)
response.raise_for_status()
data = response.json()
activities = data if isinstance(data, list) else data.get('data', [])
# Group activities by slug with filtered content
grouped_activities = {}
for activity in activities:
slug = activity.get("slug")
if slug not in grouped_activities:
grouped_activities[slug] = []
filtered_activity = {
"side": activity.get("side"), # BUY or SELL
"size": activity.get("size"), # Number of shares
"usdcSize": activity.get("usdcSize"), # USD amount
"price": activity.get("price"), # Price per share
"outcome": activity.get("outcome"), # Up/Down/Yes/No
}
grouped_activities[slug].append(filtered_activity)
return grouped_activities
except Exception as e:
return {
"error": str(e),
"trader_address": trader_address,
"timestamp": datetime.now().isoformat()
}
# Enhanced feature columns definition - optimized set with new patterns
FEATURE_COLUMNS = [
# Core price momentum (keep most informative)
'price_change', 'volatility', 'momentum_5',
# Moving averages (reduced set - keep key levels only)
'sma_20', 'ema_12', 'price_vs_sma20',
# MACD (keep histogram as most informative)
'macd_histogram',
# Oscillators (keep RSI-14 and one stochastic)
'rsi_14', 'stoch_k', 'williams_r',
# Bollinger Bands (keep position, remove width)
'bb_position',
# Volatility measure
'atr',
# Volume (keep ratio and OBV)
'volume_ratio', 'obv', 'volume_momentum',
# Price patterns (keep most informative)
'close_position', 'body_size',
# Support/Resistance
'distance_to_support', 'distance_to_resistance',
# Market structure
'trend_strength', 'higher_high', 'lower_low',
# Advanced metrics
'sharpe_ratio', 'volatility_regime',
# Enhanced regime and pattern features
'bull_market', 'volatility_breakout', 'doji', 'hammer', 'shooting_star',
'trend_alignment', 'ma_alignment', 'momentum_acceleration', 'rsi_momentum'
]
# Initialize service
crypto_service = CryptoPredictionService()
@mcp.tool()
async def predict_crypto_direction(symbol: str, interval: str = "1h", training_periods: int = 1000) -> str:
"""
Advanced ML prediction: Trains ensemble models to predict if crypto will close UP or DOWN.
Uses Random Forest + Gradient Boosting with 30+ technical indicators. Auto-trains if needed.
Args:
symbol: Trading pair symbol (e.g., 'BTCUSDT', 'ETHUSDT')
interval: Time interval for prediction ('1m', '3m', '5m', '15m', '30m', '1h', '2h', '4h', '6h', '8h', '12h', '1d', '3d', '1w', '1M') - Default: '1h'
training_periods: Number of historical periods to use for training - Default: 1000
Returns: JSON object with ML prediction including:
Prediction: direction (UP/DOWN), confidence_score, probability_up, probability_down
Model Performance: accuracy, precision, recall, f1_score
Market Context: current_price, rsi, volume_ratio, trend_strength
Feature Importance: top contributing technical indicators
Training Info: periods_used, model_type, training_timestamp
Risk Assessment: volatility, support_distance, resistance_distance
"""
try:
prediction = await crypto_service.predict_current_hour(symbol, interval, training_periods)
return json.dumps(prediction, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.tool()
async def analyze_crypto_indicators(symbol: str, interval: str = "1h", limit: int = 100) -> str:
"""
Technical analysis: Calculates essential RSI, moving averages, volume, and trend signals.
Fast analysis without ML training - provides immediate market insights.
Args:
symbol: Trading pair symbol (e.g., 'BTCUSDT', 'ETHUSDT')
interval: Time interval ('1m', '3m', '5m', '15m', '30m', '1h', '2h', '4h', '6h', '8h', '12h', '1d', '3d', '1w', '1M') - Default: '1h'
limit: Number of historical data points to analyze - Default: 100
Returns: JSON object with streamlined technical analysis including:
Core Metrics: symbol, current_price, price_change_pct, rsi, volume_ratio
Key Moving Averages: ma_20, ema_12, price_vs_ma20
Essential Signals: rsi_oversold/overbought, above_ma_20, high_volume, momentum
Support/Resistance: basic levels and distances
Data Quality: periods_analyzed, sufficient_history
"""
try:
data = await crypto_service.get_historical_data(symbol, interval, limit)
# Get current price from raw data (may be incomplete period)
current_price = data[-1]['close'] if data else 0
current_period_data = data[-1] if data else {}
# Filter complete periods for technical indicators
complete_data = crypto_service._filter_complete_periods(data)
if not complete_data:
return json.dumps({"error": "No complete periods available for analysis"}, indent=2)
# Create features from complete data only
df_complete = pd.DataFrame(complete_data)
df_complete = crypto_service.create_features(df_complete)
latest_complete = df_complete.iloc[-1]
# Create current period features if we have incomplete period
if len(data) > len(complete_data):
# There's an incomplete current period
df_current = pd.DataFrame(data)
df_current = crypto_service.create_features(df_current)
current_period_features = df_current.iloc[-1]
else:
# No incomplete period, current = latest complete
current_period_features = latest_complete
# Get current volatility for dynamic thresholds
current_volatility = float(latest_complete.get('volatility', 0.02))
dynamic_thresholds = crypto_service._get_dynamic_thresholds(current_volatility)
# Multi-timeframe trend analysis
trend_analysis = crypto_service._get_multi_timeframe_trend(df_complete, interval)
# Market structure analysis
market_structure = crypto_service._detect_market_structure(df_complete)
# Session context
session_info = crypto_service._get_session_context()
# Basic support/resistance (use complete data)
support_level = float(latest_complete.get('support_level', df_complete['low'].min()))
resistance_level = float(latest_complete.get('resistance_level', df_complete['high'].max()))
# Time info
current_hour_timestamp = current_period_data.get('open_time', int(time.time() * 1000))
time_info = crypto_service._get_prediction_time_window(interval, current_hour_timestamp)
# Enhanced signals with dynamic thresholds
rsi_current = float(latest_complete.get('rsi_14', 50))
volume_current = float(current_period_features.get('volume_ratio', 1))
enhanced_signals = {
# RSI with dynamic thresholds
"rsi_oversold": bool(rsi_current < dynamic_thresholds["rsi_oversold"]),
"rsi_overbought": bool(rsi_current > dynamic_thresholds["rsi_overbought"]),
"rsi_momentum": bool(float(current_period_features.get('rsi_momentum', 0)) > 0),
# Volume analysis
"high_volume": bool(volume_current > 1.5),
"volume_breakout": bool(volume_current > dynamic_thresholds["volume_breakout"]),
"volume_momentum": bool(float(current_period_features.get('volume_momentum', 1)) > 1.2),
# Trend and momentum
"above_ma_20": bool(current_price > float(latest_complete.get('sma_20', current_price))),
"bullish_momentum": bool(float(current_period_features.get('momentum_5', 0)) > dynamic_thresholds["momentum_threshold"]),
"bearish_momentum": bool(float(current_period_features.get('momentum_5', 0)) < -dynamic_thresholds["momentum_threshold"]),
"trend_alignment": bool(current_period_features.get('trend_alignment', 0)),
"ma_alignment": bool(current_period_features.get('ma_alignment', 0)),
# Support/Resistance
"near_support": bool(abs(current_price - support_level) / current_price < 0.02),
"near_resistance": bool(abs(current_price - resistance_level) / current_price < 0.02),
# Pattern recognition
"doji_pattern": bool(current_period_features.get('doji', 0)),
"hammer_pattern": bool(current_period_features.get('hammer', 0)),
"shooting_star": bool(current_period_features.get('shooting_star', 0)),
# Market regime
"bull_market": bool(current_period_features.get('bull_market', 0)),
"volatility_breakout": bool(current_period_features.get('volatility_breakout', 0)),
"breakout_setup": market_structure["breakout_probability"] > 0.7
}
# Calculate confluence score
confluence = crypto_service._calculate_confluence_score(enhanced_signals, dynamic_thresholds)
# Enhanced analysis
analysis = {
"symbol": symbol,
"current_price": float(current_price),
"analysis_time": time_info["current_time"],
# Core indicators with dynamic context
"price_change_pct": float(current_period_features.get('price_change', 0) * 100),
"rsi": rsi_current,
"rsi_dynamic_oversold": dynamic_thresholds["rsi_oversold"],
"rsi_dynamic_overbought": dynamic_thresholds["rsi_overbought"],
"volume_ratio": volume_current,
"volatility": current_volatility,
# Key moving averages
"ma_20": float(latest_complete.get('sma_20', 0)),
"ema_12": float(latest_complete.get('ema_12', 0)),
"price_vs_ma20": float(current_price / latest_complete.get('sma_20', current_price) - 1) if latest_complete.get('sma_20', 0) > 0 else 0,
# Multi-timeframe trend
"trend_analysis": trend_analysis,
# Enhanced momentum
"momentum_5": float(current_period_features.get('momentum_5', 0) * 100),
"momentum_acceleration": float(current_period_features.get('momentum_acceleration', 0)),
# Support/Resistance with strength
"support_level": support_level,
"resistance_level": resistance_level,
"distance_to_support": float((current_price - support_level) / current_price * 100),
"distance_to_resistance": float((resistance_level - current_price) / current_price * 100),
"support_strength": market_structure["support_strength"],
"resistance_strength": market_structure["resistance_strength"],
# Market structure
"market_structure": market_structure,
# Session context
"session_info": session_info,
# Enhanced signals
"signals": enhanced_signals,
# Confluence analysis
"confluence_analysis": confluence,
# Risk assessment
"risk_assessment": {
"volatility_regime": "HIGH" if current_volatility > 0.04 else "MEDIUM" if current_volatility > 0.015 else "LOW",
"market_phase": market_structure["market_phase"],
"breakout_probability": market_structure["breakout_probability"],
"signal_reliability": confluence["signal_strength"]
},
# Data quality
"data_quality": {
"periods_analyzed": len(df_complete),
"sufficient_history": bool(len(df_complete) >= 50),
"features_calculated": len([col for col in df_complete.columns if not col.startswith('open_time')])
}
}
return json.dumps(analysis, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.tool()
async def get_crypto_news_search(symbol: str = "bitcoin") -> str:
"""
Generate optimized search query for cryptocurrency news analysis.
Returns structured data to be used with Claude Code's WebSearch tool.
Args:
symbol: Cryptocurrency symbol (e.g., 'BTCUSDT', 'bitcoin', 'ethereum') - Default: 'bitcoin'
Returns: JSON object with:
Search Query: optimized query for WebSearch tool
Suggested Domains: reliable crypto news sources
Analysis Prompt: structured prompt for sentiment analysis
Search Instructions: how to use the query effectively
Usage: Copy the search_query and use it with Claude Code's WebSearch tool,
then apply the analysis_prompt to the results for market sentiment analysis.
"""
try:
search_data = crypto_service.get_crypto_news_search_query(symbol)
return json.dumps(search_data, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.tool()
async def monitor_polymarket_trader(trader_address: str, limit: int = 100) -> str:
"""
Fetch Polymarket trader activities for behavioral analysis.
Returns raw activity data to understand trader patterns and strategies.
Args:
trader_address: Ethereum wallet address of the trader to monitor
limit: Number of recent activities to fetch (default: 200, max: 1000)
Returns: JSON object with:
Trader Info: address, total activities count
Activities: Complete list of trading activities with timestamps, markets, outcomes, sizes, prices
Raw Data: Full activity details for comprehensive analysis
Usage: Analyze the returned activities to understand:
- Trading patterns and preferences (UP/DOWN, YES/NO outcomes)
- Market focus areas and diversification
- Position sizing and risk management
- Trading frequency and timing
- P&L performance across different markets
"""
try:
trader_data = await crypto_service.fetch_trader_activities(trader_address, limit)
return json.dumps(trader_data, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
if __name__ == "__main__":
mcp.run()