Skip to main content
Glama

TradingView MCP Server

server.py48 kB
from __future__ import annotations import argparse import os from typing import Any, Dict, List, Optional from typing_extensions import TypedDict from mcp.server.fastmcp import FastMCP # Import bollinger band screener modules from tradingview_mcp.core.services.indicators import compute_metrics from tradingview_mcp.core.services.coinlist import load_symbols from tradingview_mcp.core.utils.validators import sanitize_timeframe, sanitize_exchange, EXCHANGE_SCREENER, ALLOWED_TIMEFRAMES try: from tradingview_ta import TA_Handler, get_multiple_analysis TRADINGVIEW_TA_AVAILABLE = True except ImportError: TRADINGVIEW_TA_AVAILABLE = False try: from tradingview_screener import Query from tradingview_screener.column import Column TRADINGVIEW_SCREENER_AVAILABLE = True except ImportError: TRADINGVIEW_SCREENER_AVAILABLE = False class IndicatorMap(TypedDict, total=False): open: Optional[float] close: Optional[float] SMA20: Optional[float] BB_upper: Optional[float] BB_lower: Optional[float] EMA50: Optional[float] RSI: Optional[float] volume: Optional[float] class Row(TypedDict): symbol: str changePercent: float indicators: IndicatorMap class MultiRow(TypedDict): symbol: str changes: dict[str, Optional[float]] base_indicators: IndicatorMap def _map_indicators(raw: Dict[str, Any]) -> IndicatorMap: return IndicatorMap( open=raw.get("open"), close=raw.get("close"), SMA20=raw.get("SMA20"), BB_upper=raw.get("BB.upper") if "BB.upper" in raw else raw.get("BB_upper"), BB_lower=raw.get("BB.lower") if "BB.lower" in raw else raw.get("BB_lower"), EMA50=raw.get("EMA50"), RSI=raw.get("RSI"), volume=raw.get("volume"), ) def _percent_change(o: Optional[float], c: Optional[float]) -> Optional[float]: try: if o in (None, 0) or c is None: return None return (c - o) / o * 100 except Exception: return None def _tf_to_tv_resolution(tf: Optional[str]) -> Optional[str]: if not tf: return None return {"5m": "5", "15m": "15", "1h": "60", "4h": "240", "1D": "1D", "1W": "1W", "1M": "1M"}.get(tf) def _fetch_bollinger_analysis(exchange: str, timeframe: str = "4h", limit: int = 50, bbw_filter: float = None) -> List[Row]: """Fetch analysis using tradingview_ta with bollinger band logic from the original screener.""" if not TRADINGVIEW_TA_AVAILABLE: raise RuntimeError("tradingview_ta is missing; run `uv sync`.") # Load symbols from coinlist files symbols = load_symbols(exchange) if not symbols: raise RuntimeError(f"No symbols found for exchange: {exchange}") # Limit symbols for performance symbols = symbols[:limit * 2] # Get more to filter later # Get screener type based on exchange screener = EXCHANGE_SCREENER.get(exchange, "crypto") try: analysis = get_multiple_analysis(screener=screener, interval=timeframe, symbols=symbols) except Exception as e: raise RuntimeError(f"Analysis failed: {str(e)}") rows: List[Row] = [] for key, value in analysis.items(): try: if value is None: continue indicators = value.indicators metrics = compute_metrics(indicators) if not metrics or metrics.get('bbw') is None: continue # Apply BBW filter if specified if bbw_filter is not None and (metrics['bbw'] >= bbw_filter or metrics['bbw'] <= 0): continue # Check if we have required indicators if not (indicators.get("EMA50") and indicators.get("RSI")): continue rows.append(Row( symbol=key, changePercent=metrics['change'], indicators=IndicatorMap( open=metrics.get('open'), close=metrics.get('price'), SMA20=indicators.get("SMA20"), BB_upper=indicators.get("BB.upper"), BB_lower=indicators.get("BB.lower"), EMA50=indicators.get("EMA50"), RSI=indicators.get("RSI"), volume=indicators.get("volume"), ) )) except (TypeError, ZeroDivisionError, KeyError): continue # Sort by change percentage in descending order (highest gainers first) rows.sort(key=lambda x: x["changePercent"], reverse=True) # Return the requested limit return rows[:limit] def _fetch_trending_analysis(exchange: str, timeframe: str = "5m", filter_type: str = "", rating_filter: int = None, limit: int = 50) -> List[Row]: """Fetch trending coins analysis similar to the original app's trending endpoint.""" if not TRADINGVIEW_TA_AVAILABLE: raise RuntimeError("tradingview_ta is missing; run `uv sync`.") symbols = load_symbols(exchange) if not symbols: raise RuntimeError(f"No symbols found for exchange: {exchange}") # Process symbols in batches due to TradingView API limits batch_size = 200 # Considering API limitations all_coins = [] screener = EXCHANGE_SCREENER.get(exchange, "crypto") # Process symbols in batches for i in range(0, len(symbols), batch_size): batch_symbols = symbols[i:i + batch_size] try: analysis = get_multiple_analysis(screener=screener, interval=timeframe, symbols=batch_symbols) except Exception as e: continue # If this batch fails, move to the next one # Process coins in this batch for key, value in analysis.items(): try: if value is None: continue indicators = value.indicators metrics = compute_metrics(indicators) if not metrics or metrics.get('bbw') is None: continue # Apply rating filter if specified if filter_type == "rating" and rating_filter is not None: if metrics['rating'] != rating_filter: continue all_coins.append(Row( symbol=key, changePercent=metrics['change'], indicators=IndicatorMap( open=metrics.get('open'), close=metrics.get('price'), SMA20=indicators.get("SMA20"), BB_upper=indicators.get("BB.upper"), BB_lower=indicators.get("BB.lower"), EMA50=indicators.get("EMA50"), RSI=indicators.get("RSI"), volume=indicators.get("volume"), ) )) except (TypeError, ZeroDivisionError, KeyError): continue # Sort all coins by change percentage all_coins.sort(key=lambda x: x["changePercent"], reverse=True) return all_coins[:limit] def _fetch_multi_changes(exchange: str, timeframes: List[str] | None, base_timeframe: str = "4h", limit: int | None = None, cookies: Any | None = None) -> List[MultiRow]: try: from tradingview_screener import Query from tradingview_screener.column import Column except Exception as e: raise RuntimeError("tradingview-screener missing; run `uv sync`.") from e tfs = timeframes or ["15m", "1h", "4h", "1D"] suffix_map: dict[str, str] = {} for tf in tfs: s = _tf_to_tv_resolution(tf) if s: suffix_map[tf] = s if not suffix_map: suffix_map = {base_timeframe: _tf_to_tv_resolution(base_timeframe) or "240"} base_suffix = _tf_to_tv_resolution(base_timeframe) or next(iter(suffix_map.values())) cols: list[str] = [] seen: set[str] = set() for tf, s in suffix_map.items(): for c in (f"open|{s}", f"close|{s}"): if c not in seen: cols.append(c) seen.add(c) for c in (f"SMA20|{base_suffix}", f"BB.upper|{base_suffix}", f"BB.lower|{base_suffix}", f"volume|{base_suffix}"): if c not in seen: cols.append(c) seen.add(c) q = Query().set_markets("crypto").select(*cols) if exchange: q = q.where(Column("exchange") == exchange.upper()) if limit: q = q.limit(int(limit)) _total, df = q.get_scanner_data(cookies=cookies) if df is None or df.empty: return [] out: List[MultiRow] = [] for _, r in df.iterrows(): symbol = r.get("ticker") changes: dict[str, Optional[float]] = {} for tf, s in suffix_map.items(): o = r.get(f"open|{s}") c = r.get(f"close|{s}") changes[tf] = _percent_change(o, c) base_ind = IndicatorMap( open=r.get(f"open|{base_suffix}"), close=r.get(f"close|{base_suffix}"), SMA20=r.get(f"SMA20|{base_suffix}"), BB_upper=r.get(f"BB.upper|{base_suffix}"), BB_lower=r.get(f"BB.lower|{base_suffix}"), volume=r.get(f"volume|{base_suffix}"), ) out.append(MultiRow(symbol=symbol, changes=changes, base_indicators=base_ind)) return out mcp = FastMCP( name="TradingView Screener", instructions=("Crypto screener utilities backed by TradingView Screener. Tools: top_gainers, top_losers, multi_changes."), ) @mcp.tool() def top_gainers(exchange: str = "KUCOIN", timeframe: str = "15m", limit: int = 25) -> list[dict]: """Return top gainers for an exchange and timeframe using bollinger band analysis. Args: exchange: Exchange name like KUCOIN, BINANCE, BYBIT, etc. timeframe: One of 5m, 15m, 1h, 4h, 1D, 1W, 1M limit: Number of rows to return (max 50) """ exchange = sanitize_exchange(exchange, "KUCOIN") timeframe = sanitize_timeframe(timeframe, "15m") limit = max(1, min(limit, 50)) rows = _fetch_trending_analysis(exchange, timeframe=timeframe, limit=limit) # Convert Row objects to dicts properly return [{ "symbol": row["symbol"], "changePercent": row["changePercent"], "indicators": dict(row["indicators"]) } for row in rows] @mcp.tool() def top_losers(exchange: str = "KUCOIN", timeframe: str = "15m", limit: int = 25) -> list[dict]: """Return top losers for an exchange and timeframe using bollinger band analysis.""" exchange = sanitize_exchange(exchange, "KUCOIN") timeframe = sanitize_timeframe(timeframe, "15m") limit = max(1, min(limit, 50)) rows = _fetch_trending_analysis(exchange, timeframe=timeframe, limit=limit) # Reverse sort for losers (lowest change first) rows.sort(key=lambda x: x["changePercent"]) # Convert to dict format return [{ "symbol": row["symbol"], "changePercent": row["changePercent"], "indicators": dict(row["indicators"]) } for row in rows[:limit]] @mcp.tool() def bollinger_scan(exchange: str = "KUCOIN", timeframe: str = "4h", bbw_threshold: float = 0.04, limit: int = 50) -> list[dict]: """Scan for coins with low Bollinger Band Width (squeeze detection). Args: exchange: Exchange name like KUCOIN, BINANCE, BYBIT, etc. timeframe: One of 5m, 15m, 1h, 4h, 1D, 1W, 1M bbw_threshold: Maximum BBW value to filter (default 0.04) limit: Number of rows to return (max 100) """ exchange = sanitize_exchange(exchange, "KUCOIN") timeframe = sanitize_timeframe(timeframe, "4h") limit = max(1, min(limit, 100)) rows = _fetch_bollinger_analysis(exchange, timeframe=timeframe, bbw_filter=bbw_threshold, limit=limit) # Convert Row objects to dicts return [{ "symbol": row["symbol"], "changePercent": row["changePercent"], "indicators": dict(row["indicators"]) } for row in rows] @mcp.tool() def rating_filter(exchange: str = "KUCOIN", timeframe: str = "5m", rating: int = 2, limit: int = 25) -> list[dict]: """Filter coins by Bollinger Band rating. Args: exchange: Exchange name like KUCOIN, BINANCE, BYBIT, etc. timeframe: One of 5m, 15m, 1h, 4h, 1D, 1W, 1M rating: BB rating (-3 to +3): -3=Strong Sell, -2=Sell, -1=Weak Sell, 1=Weak Buy, 2=Buy, 3=Strong Buy limit: Number of rows to return (max 50) """ exchange = sanitize_exchange(exchange, "KUCOIN") timeframe = sanitize_timeframe(timeframe, "5m") rating = max(-3, min(3, rating)) limit = max(1, min(limit, 50)) rows = _fetch_trending_analysis(exchange, timeframe=timeframe, filter_type="rating", rating_filter=rating, limit=limit) # Convert Row objects to dicts return [{ "symbol": row["symbol"], "changePercent": row["changePercent"], "indicators": dict(row["indicators"]) } for row in rows] @mcp.tool() def coin_analysis( symbol: str, exchange: str = "KUCOIN", timeframe: str = "15m" ) -> dict: """Get detailed analysis for a specific coin on specified exchange and timeframe. Args: symbol: Coin symbol (e.g., "ACEUSDT", "BTCUSDT") exchange: Exchange name (BINANCE, KUCOIN, etc.) timeframe: Time interval (5m, 15m, 1h, 4h, 1D, 1W, 1M) Returns: Detailed coin analysis with all indicators and metrics """ try: exchange = sanitize_exchange(exchange, "KUCOIN") timeframe = sanitize_timeframe(timeframe, "15m") # Format symbol with exchange prefix if ":" not in symbol: full_symbol = f"{exchange.upper()}:{symbol.upper()}" else: full_symbol = symbol.upper() screener = EXCHANGE_SCREENER.get(exchange, "crypto") try: analysis = get_multiple_analysis( screener=screener, interval=timeframe, symbols=[full_symbol] ) if full_symbol not in analysis or analysis[full_symbol] is None: return { "error": f"No data found for {symbol} on {exchange}", "symbol": symbol, "exchange": exchange, "timeframe": timeframe } data = analysis[full_symbol] indicators = data.indicators # Calculate all metrics metrics = compute_metrics(indicators) if not metrics: return { "error": f"Could not compute metrics for {symbol}", "symbol": symbol, "exchange": exchange, "timeframe": timeframe } # Additional technical indicators macd = indicators.get("MACD.macd", 0) macd_signal = indicators.get("MACD.signal", 0) adx = indicators.get("ADX", 0) stoch_k = indicators.get("Stoch.K", 0) stoch_d = indicators.get("Stoch.D", 0) # Volume analysis volume = indicators.get("volume", 0) # Price levels high = indicators.get("high", 0) low = indicators.get("low", 0) open_price = indicators.get("open", 0) close_price = indicators.get("close", 0) return { "symbol": full_symbol, "exchange": exchange, "timeframe": timeframe, "timestamp": "real-time", "price_data": { "current_price": metrics['price'], "open": round(open_price, 6) if open_price else None, "high": round(high, 6) if high else None, "low": round(low, 6) if low else None, "close": round(close_price, 6) if close_price else None, "change_percent": metrics['change'], "volume": volume }, "bollinger_analysis": { "rating": metrics['rating'], "signal": metrics['signal'], "bbw": metrics['bbw'], "bb_upper": round(indicators.get("BB.upper", 0), 6), "bb_middle": round(indicators.get("SMA20", 0), 6), "bb_lower": round(indicators.get("BB.lower", 0), 6), "position": "Above Upper" if close_price > indicators.get("BB.upper", 0) else "Below Lower" if close_price < indicators.get("BB.lower", 0) else "Within Bands" }, "technical_indicators": { "rsi": round(indicators.get("RSI", 0), 2), "rsi_signal": "Overbought" if indicators.get("RSI", 0) > 70 else "Oversold" if indicators.get("RSI", 0) < 30 else "Neutral", "sma20": round(indicators.get("SMA20", 0), 6), "ema50": round(indicators.get("EMA50", 0), 6), "ema200": round(indicators.get("EMA200", 0), 6), "macd": round(macd, 6), "macd_signal": round(macd_signal, 6), "macd_divergence": round(macd - macd_signal, 6), "adx": round(adx, 2), "trend_strength": "Strong" if adx > 25 else "Weak", "stoch_k": round(stoch_k, 2), "stoch_d": round(stoch_d, 2) }, "market_sentiment": { "overall_rating": metrics['rating'], "buy_sell_signal": metrics['signal'], "volatility": "High" if metrics['bbw'] > 0.05 else "Medium" if metrics['bbw'] > 0.02 else "Low", "momentum": "Bullish" if metrics['change'] > 0 else "Bearish" } } except Exception as e: return { "error": f"Analysis failed: {str(e)}", "symbol": symbol, "exchange": exchange, "timeframe": timeframe } except Exception as e: return { "error": f"Coin analysis failed: {str(e)}", "symbol": symbol, "exchange": exchange, "timeframe": timeframe } @mcp.tool() def consecutive_candles_scan( exchange: str = "KUCOIN", timeframe: str = "15m", pattern_type: str = "bullish", candle_count: int = 3, min_growth: float = 2.0, limit: int = 20 ) -> dict: """Scan for coins with consecutive growing/shrinking candles pattern. Args: exchange: Exchange name (BINANCE, KUCOIN, etc.) timeframe: Time interval (5m, 15m, 1h, 4h) pattern_type: "bullish" (growing candles) or "bearish" (shrinking candles) candle_count: Number of consecutive candles to check (2-5) min_growth: Minimum growth percentage for each candle limit: Maximum number of results to return Returns: List of coins with consecutive candle patterns """ try: exchange = sanitize_exchange(exchange, "KUCOIN") timeframe = sanitize_timeframe(timeframe, "15m") candle_count = max(2, min(5, candle_count)) min_growth = max(0.5, min(20.0, min_growth)) limit = max(1, min(50, limit)) # Get symbols for the exchange symbols = load_symbols(exchange) if not symbols: return { "error": f"No symbols found for exchange: {exchange}", "exchange": exchange, "timeframe": timeframe } # Limit symbols for performance (we need historical data) symbols = symbols[:min(limit * 3, 200)] # We need to get data from multiple timeframes to analyze candle progression # For now, we'll use current timeframe data and simulate pattern detection screener = EXCHANGE_SCREENER.get(exchange, "crypto") try: analysis = get_multiple_analysis( screener=screener, interval=timeframe, symbols=symbols ) pattern_coins = [] for symbol, data in analysis.items(): if data is None: continue try: indicators = data.indicators # Calculate current candle metrics open_price = indicators.get("open") close_price = indicators.get("close") high_price = indicators.get("high") low_price = indicators.get("low") volume = indicators.get("volume", 0) if not all([open_price, close_price, high_price, low_price]): continue # Calculate current candle body size and change current_change = ((close_price - open_price) / open_price) * 100 candle_body = abs(close_price - open_price) candle_range = high_price - low_price body_to_range_ratio = candle_body / candle_range if candle_range > 0 else 0 # For consecutive pattern, we'll use available indicators to simulate # In a real implementation, we'd need historical OHLC data # Use RSI and price momentum as proxy for consecutive pattern rsi = indicators.get("RSI", 50) sma20 = indicators.get("SMA20", close_price) ema50 = indicators.get("EMA50", close_price) # Calculate momentum indicators price_above_sma = close_price > sma20 price_above_ema = close_price > ema50 strong_momentum = abs(current_change) >= min_growth # Pattern detection logic pattern_detected = False pattern_strength = 0 if pattern_type == "bullish": # Bullish pattern: price rising, good momentum, strong candle body conditions = [ current_change > min_growth, # Current candle is bullish body_to_range_ratio > 0.6, # Strong candle body price_above_sma, # Above short MA rsi > 45 and rsi < 80, # RSI in momentum range volume > 1000 # Decent volume ] pattern_strength = sum(conditions) pattern_detected = pattern_strength >= 3 elif pattern_type == "bearish": # Bearish pattern: price falling, bearish momentum conditions = [ current_change < -min_growth, # Current candle is bearish body_to_range_ratio > 0.6, # Strong candle body not price_above_sma, # Below short MA rsi < 55 and rsi > 20, # RSI in bearish range volume > 1000 # Decent volume ] pattern_strength = sum(conditions) pattern_detected = pattern_strength >= 3 if pattern_detected: # Calculate additional metrics metrics = compute_metrics(indicators) coin_data = { "symbol": symbol, "price": round(close_price, 6), "current_change": round(current_change, 3), "candle_body_ratio": round(body_to_range_ratio, 3), "pattern_strength": pattern_strength, "volume": volume, "bollinger_rating": metrics.get('rating', 0) if metrics else 0, "rsi": round(rsi, 2), "price_levels": { "open": round(open_price, 6), "high": round(high_price, 6), "low": round(low_price, 6), "close": round(close_price, 6) }, "momentum_signals": { "above_sma20": price_above_sma, "above_ema50": price_above_ema, "strong_volume": volume > 5000 } } pattern_coins.append(coin_data) except Exception as e: continue # Sort by pattern strength and current change if pattern_type == "bullish": pattern_coins.sort(key=lambda x: (x['pattern_strength'], x['current_change']), reverse=True) else: pattern_coins.sort(key=lambda x: (x['pattern_strength'], -x['current_change']), reverse=True) return { "exchange": exchange, "timeframe": timeframe, "pattern_type": pattern_type, "candle_count": candle_count, "min_growth": min_growth, "total_found": len(pattern_coins), "data": pattern_coins[:limit] } except Exception as e: return { "error": f"Pattern analysis failed: {str(e)}", "exchange": exchange, "timeframe": timeframe } except Exception as e: return { "error": f"Consecutive candles scan failed: {str(e)}", "exchange": exchange, "timeframe": timeframe } @mcp.tool() def advanced_candle_pattern( exchange: str = "KUCOIN", base_timeframe: str = "15m", pattern_length: int = 3, min_size_increase: float = 10.0, limit: int = 15 ) -> dict: """Advanced candle pattern analysis using multi-timeframe data. Args: exchange: Exchange name (BINANCE, KUCOIN, etc.) base_timeframe: Base timeframe for analysis (5m, 15m, 1h, 4h) pattern_length: Number of consecutive periods to analyze (2-4) min_size_increase: Minimum percentage increase in candle size limit: Maximum number of results to return Returns: Coins with progressive candle size increase patterns """ try: exchange = sanitize_exchange(exchange, "KUCOIN") base_timeframe = sanitize_timeframe(base_timeframe, "15m") pattern_length = max(2, min(4, pattern_length)) min_size_increase = max(5.0, min(50.0, min_size_increase)) limit = max(1, min(30, limit)) # Get symbols symbols = load_symbols(exchange) if not symbols: return { "error": f"No symbols found for exchange: {exchange}", "exchange": exchange } # Limit for performance symbols = symbols[:min(limit * 2, 100)] # Use tradingview-screener for multi-timeframe data if available if TRADINGVIEW_SCREENER_AVAILABLE: try: # Get multiple timeframe data using screener results = _fetch_multi_timeframe_patterns( exchange, symbols, base_timeframe, pattern_length, min_size_increase ) return { "exchange": exchange, "base_timeframe": base_timeframe, "pattern_length": pattern_length, "min_size_increase": min_size_increase, "method": "multi-timeframe", "total_found": len(results), "data": results[:limit] } except Exception as e: # Fallback to single timeframe analysis pass # Fallback: Use single timeframe with enhanced pattern detection screener = EXCHANGE_SCREENER.get(exchange, "crypto") analysis = get_multiple_analysis( screener=screener, interval=base_timeframe, symbols=symbols ) pattern_results = [] for symbol, data in analysis.items(): if data is None: continue try: indicators = data.indicators # Enhanced pattern detection using available indicators pattern_score = _calculate_candle_pattern_score( indicators, pattern_length, min_size_increase ) if pattern_score['detected']: metrics = compute_metrics(indicators) result = { "symbol": symbol, "pattern_score": pattern_score['score'], "pattern_details": pattern_score['details'], "current_price": pattern_score['price'], "total_change": pattern_score['total_change'], "volume": indicators.get("volume", 0), "bollinger_rating": metrics.get('rating', 0) if metrics else 0, "technical_strength": { "rsi": round(indicators.get("RSI", 50), 2), "momentum": "Strong" if abs(pattern_score['total_change']) > min_size_increase else "Moderate", "volume_trend": "High" if indicators.get("volume", 0) > 10000 else "Low" } } pattern_results.append(result) except Exception as e: continue # Sort by pattern score and total change pattern_results.sort(key=lambda x: (x['pattern_score'], abs(x['total_change'])), reverse=True) return { "exchange": exchange, "base_timeframe": base_timeframe, "pattern_length": pattern_length, "min_size_increase": min_size_increase, "method": "enhanced-single-timeframe", "total_found": len(pattern_results), "data": pattern_results[:limit] } except Exception as e: return { "error": f"Advanced pattern analysis failed: {str(e)}", "exchange": exchange, "base_timeframe": base_timeframe } def _calculate_candle_pattern_score(indicators: dict, pattern_length: int, min_increase: float) -> dict: """Calculate candle pattern score based on available indicators.""" try: open_price = indicators.get("open", 0) close_price = indicators.get("close", 0) high_price = indicators.get("high", 0) low_price = indicators.get("low", 0) volume = indicators.get("volume", 0) rsi = indicators.get("RSI", 50) if not all([open_price, close_price, high_price, low_price]): return {"detected": False, "score": 0} # Current candle analysis candle_body = abs(close_price - open_price) candle_range = high_price - low_price body_ratio = candle_body / candle_range if candle_range > 0 else 0 # Price change price_change = ((close_price - open_price) / open_price) * 100 # Pattern scoring score = 0 details = [] # Strong candle body if body_ratio > 0.7: score += 2 details.append("Strong candle body") elif body_ratio > 0.5: score += 1 details.append("Moderate candle body") # Significant price movement if abs(price_change) >= min_increase: score += 2 details.append(f"Strong momentum ({price_change:.1f}%)") elif abs(price_change) >= min_increase / 2: score += 1 details.append(f"Moderate momentum ({price_change:.1f}%)") # Volume confirmation if volume > 5000: score += 1 details.append("Good volume") # RSI momentum if (price_change > 0 and 50 < rsi < 80) or (price_change < 0 and 20 < rsi < 50): score += 1 details.append("RSI momentum aligned") # Trend consistency (using EMA vs price) ema50 = indicators.get("EMA50", close_price) if (price_change > 0 and close_price > ema50) or (price_change < 0 and close_price < ema50): score += 1 details.append("Trend alignment") detected = score >= 3 # Minimum threshold return { "detected": detected, "score": score, "details": details, "price": round(close_price, 6), "total_change": round(price_change, 3), "body_ratio": round(body_ratio, 3), "volume": volume } except Exception as e: return {"detected": False, "score": 0, "error": str(e)} def _fetch_multi_timeframe_patterns(exchange: str, symbols: List[str], base_tf: str, length: int, min_increase: float) -> List[dict]: """Fetch multi-timeframe pattern data using tradingview-screener.""" try: from tradingview_screener import Query from tradingview_screener.column import Column # Map timeframe to TradingView format tf_map = {"5m": "5", "15m": "15", "1h": "60", "4h": "240", "1D": "1D"} tv_interval = tf_map.get(base_tf, "15") # Create query for OHLC data cols = [ f"open|{tv_interval}", f"close|{tv_interval}", f"high|{tv_interval}", f"low|{tv_interval}", f"volume|{tv_interval}", "RSI" ] q = Query().set_markets("crypto").select(*cols) q = q.where(Column("exchange") == exchange.upper()) q = q.limit(len(symbols)) total, df = q.get_scanner_data() if df is None or df.empty: return [] results = [] for _, row in df.iterrows(): symbol = row.get("ticker", "") try: open_val = row.get(f"open|{tv_interval}") close_val = row.get(f"close|{tv_interval}") high_val = row.get(f"high|{tv_interval}") low_val = row.get(f"low|{tv_interval}") volume_val = row.get(f"volume|{tv_interval}", 0) rsi_val = row.get("RSI", 50) if not all([open_val, close_val, high_val, low_val]): continue # Calculate pattern metrics pattern_score = _calculate_candle_pattern_score({ "open": open_val, "close": close_val, "high": high_val, "low": low_val, "volume": volume_val, "RSI": rsi_val }, length, min_increase) if pattern_score['detected']: results.append({ "symbol": symbol, "pattern_score": pattern_score['score'], "price": pattern_score['price'], "change": pattern_score['total_change'], "body_ratio": pattern_score['body_ratio'], "volume": volume_val, "rsi": round(rsi_val, 2), "details": pattern_score['details'] }) except Exception as e: continue return sorted(results, key=lambda x: x['pattern_score'], reverse=True) except Exception as e: return [] @mcp.resource("exchanges://list") def exchanges_list() -> str: """List available exchanges from coinlist directory.""" try: import os # Get the directory where this module is located current_dir = os.path.dirname(__file__) coinlist_dir = os.path.join(current_dir, "coinlist") if os.path.exists(coinlist_dir): exchanges = [] for filename in os.listdir(coinlist_dir): if filename.endswith('.txt'): exchange_name = filename[:-4].upper() exchanges.append(exchange_name) if exchanges: return f"Available exchanges: {', '.join(sorted(exchanges))}" # Fallback to static list return "Common exchanges: KUCOIN, BINANCE, BYBIT, OKX, COINBASE, GATEIO, HUOBI, BITFINEX, KRAKEN, BITSTAMP, BIST, NASDAQ" except Exception: return "Common exchanges: KUCOIN, BINANCE, BYBIT, OKX, COINBASE, GATEIO, HUOBI, BITFINEX, KRAKEN, BITSTAMP, BIST, NASDAQ" def main() -> None: parser = argparse.ArgumentParser(description="TradingView Screener MCP server") parser.add_argument("transport", choices=["stdio", "streamable-http"], default="stdio", nargs="?", help="Transport (default stdio)") parser.add_argument("--host", default=os.environ.get("HOST", "127.0.0.1")) parser.add_argument("--port", type=int, default=int(os.environ.get("PORT", "8000"))) args = parser.parse_args() if os.environ.get("DEBUG_MCP"): import sys print(f"[DEBUG_MCP] pkg cwd={os.getcwd()} argv={sys.argv} file={__file__}", file=sys.stderr, flush=True) if args.transport == "stdio": mcp.run() else: try: mcp.settings.host = args.host mcp.settings.port = args.port except Exception: pass mcp.run(transport="streamable-http") @mcp.tool() def volume_breakout_scanner(exchange: str = "KUCOIN", timeframe: str = "15m", volume_multiplier: float = 2.0, price_change_min: float = 3.0, limit: int = 25) -> list[dict]: """Detect coins with volume breakout + price breakout. Args: exchange: Exchange name like KUCOIN, BINANCE, BYBIT, etc. timeframe: One of 5m, 15m, 1h, 4h, 1D, 1W, 1M volume_multiplier: How many times the volume should be above normal level (default 2.0) price_change_min: Minimum price change percentage (default 3.0) limit: Number of rows to return (max 50) """ exchange = sanitize_exchange(exchange, "KUCOIN") timeframe = sanitize_timeframe(timeframe, "15m") volume_multiplier = max(1.5, min(10.0, volume_multiplier)) price_change_min = max(1.0, min(20.0, price_change_min)) limit = max(1, min(limit, 50)) # Get symbols symbols = load_symbols(exchange) if not symbols: return [] screener = EXCHANGE_SCREENER.get(exchange, "crypto") volume_breakouts = [] # Process in batches batch_size = 100 for i in range(0, min(len(symbols), 500), batch_size): # Limit to 500 symbols for performance batch_symbols = symbols[i:i + batch_size] try: analysis = get_multiple_analysis(screener=screener, interval=timeframe, symbols=batch_symbols) except Exception: continue for symbol, data in analysis.items(): try: if not data or not hasattr(data, 'indicators'): continue indicators = data.indicators # Get required data volume = indicators.get('volume', 0) close = indicators.get('close', 0) open_price = indicators.get('open', 0) sma20_volume = indicators.get('volume.SMA20', 0) # 20-period volume average if not all([volume, close, open_price]) or volume <= 0: continue # Calculate price change % price_change = ((close - open_price) / open_price) * 100 if open_price > 0 else 0 # Volume ratio calculation # If SMA20 volume not available, use a simple heuristic if sma20_volume and sma20_volume > 0: volume_ratio = volume / sma20_volume else: # Estimate average volume as current volume / 2 (conservative) avg_volume_estimate = volume / 2 volume_ratio = volume / avg_volume_estimate if avg_volume_estimate > 0 else 1 # Check conditions if (abs(price_change) >= price_change_min and volume_ratio >= volume_multiplier): # Get additional indicators rsi = indicators.get('RSI', 50) bb_upper = indicators.get('BB.upper', 0) bb_lower = indicators.get('BB.lower', 0) # Volume strength score volume_strength = min(10, volume_ratio) # Cap at 10x volume_breakouts.append({ "symbol": symbol, "changePercent": price_change, "volume_ratio": round(volume_ratio, 2), "volume_strength": round(volume_strength, 1), "current_volume": volume, "breakout_type": "bullish" if price_change > 0 else "bearish", "indicators": { "close": close, "RSI": rsi, "BB_upper": bb_upper, "BB_lower": bb_lower, "volume": volume } }) except Exception: continue # Sort by volume strength first, then by price change volume_breakouts.sort(key=lambda x: (x["volume_strength"], abs(x["changePercent"])), reverse=True) return volume_breakouts[:limit] @mcp.tool() def volume_confirmation_analysis(symbol: str, exchange: str = "KUCOIN", timeframe: str = "15m") -> dict: """Detailed volume confirmation analysis for a specific coin. Args: symbol: Coin symbol (e.g., BTCUSDT) exchange: Exchange name timeframe: Time frame for analysis """ exchange = sanitize_exchange(exchange, "KUCOIN") timeframe = sanitize_timeframe(timeframe, "15m") if not symbol.upper().endswith('USDT'): symbol = symbol.upper() + 'USDT' screener = EXCHANGE_SCREENER.get(exchange, "crypto") try: analysis = get_multiple_analysis(screener=screener, interval=timeframe, symbols=[symbol]) if not analysis or symbol not in analysis: return {"error": f"No data found for {symbol}"} data = analysis[symbol] if not data or not hasattr(data, 'indicators'): return {"error": f"No indicator data for {symbol}"} indicators = data.indicators # Get volume data volume = indicators.get('volume', 0) close = indicators.get('close', 0) open_price = indicators.get('open', 0) high = indicators.get('high', 0) low = indicators.get('low', 0) # Calculate price metrics price_change = ((close - open_price) / open_price) * 100 if open_price > 0 else 0 candle_range = ((high - low) / low) * 100 if low > 0 else 0 # Volume analysis sma20_volume = indicators.get('volume.SMA20', 0) volume_ratio = volume / sma20_volume if sma20_volume > 0 else 1 # Technical indicators rsi = indicators.get('RSI', 50) bb_upper = indicators.get('BB.upper', 0) bb_lower = indicators.get('BB.lower', 0) bb_middle = (bb_upper + bb_lower) / 2 if bb_upper and bb_lower else close # Volume confirmation signals signals = [] # Strong volume + price breakout if volume_ratio >= 2.0 and abs(price_change) >= 3.0: signals.append(f"🚀 STRONG BREAKOUT: {volume_ratio:.1f}x volume + {price_change:.1f}% price") # Volume divergence if volume_ratio >= 1.5 and abs(price_change) < 1.0: signals.append(f"⚠️ VOLUME DIVERGENCE: High volume ({volume_ratio:.1f}x) but low price movement") # Low volume on price move (weak signal) if abs(price_change) >= 2.0 and volume_ratio < 0.8: signals.append(f"❌ WEAK SIGNAL: Price moved but volume is low ({volume_ratio:.1f}x)") # Bollinger Band + Volume confirmation if close > bb_upper and volume_ratio >= 1.5: signals.append(f"💥 BB BREAKOUT CONFIRMED: Upper band breakout + volume confirmation") elif close < bb_lower and volume_ratio >= 1.5: signals.append(f"📉 BB SELL CONFIRMED: Lower band breakout + volume confirmation") # RSI + Volume analysis if rsi > 70 and volume_ratio >= 2.0: signals.append(f"🔥 OVERBOUGHT + VOLUME: RSI {rsi:.1f} + {volume_ratio:.1f}x volume") elif rsi < 30 and volume_ratio >= 2.0: signals.append(f"🛒 OVERSOLD + VOLUME: RSI {rsi:.1f} + {volume_ratio:.1f}x volume") # Overall assessment if volume_ratio >= 3.0: volume_strength = "VERY STRONG" elif volume_ratio >= 2.0: volume_strength = "STRONG" elif volume_ratio >= 1.5: volume_strength = "MEDIUM" elif volume_ratio >= 1.0: volume_strength = "NORMAL" else: volume_strength = "WEAK" return { "symbol": symbol, "price_data": { "close": close, "change_percent": round(price_change, 2), "candle_range_percent": round(candle_range, 2) }, "volume_analysis": { "current_volume": volume, "volume_ratio": round(volume_ratio, 2), "volume_strength": volume_strength, "average_volume": sma20_volume }, "technical_indicators": { "RSI": round(rsi, 1), "BB_position": "ABOVE" if close > bb_upper else "BELOW" if close < bb_lower else "WITHIN", "BB_upper": bb_upper, "BB_lower": bb_lower }, "signals": signals, "overall_assessment": { "bullish_signals": len([s for s in signals if "🚀" in s or "💥" in s or "🛒" in s]), "bearish_signals": len([s for s in signals if "📉" in s or "❌" in s]), "warning_signals": len([s for s in signals if "⚠️" in s]) } } except Exception as e: return {"error": f"Analysis failed: {str(e)}"} @mcp.tool() def smart_volume_scanner(exchange: str = "KUCOIN", min_volume_ratio: float = 2.0, min_price_change: float = 2.0, rsi_range: str = "any", limit: int = 20) -> list[dict]: """Smart volume + technical analysis combination scanner. Args: exchange: Exchange name min_volume_ratio: Minimum volume multiplier (default 2.0) min_price_change: Minimum price change percentage (default 2.0) rsi_range: "oversold" (<30), "overbought" (>70), "neutral" (30-70), "any" limit: Number of results (max 30) """ exchange = sanitize_exchange(exchange, "KUCOIN") min_volume_ratio = max(1.2, min(10.0, min_volume_ratio)) min_price_change = max(0.5, min(20.0, min_price_change)) limit = max(1, min(limit, 30)) # Get volume breakouts first volume_breakouts = volume_breakout_scanner( exchange=exchange, volume_multiplier=min_volume_ratio, price_change_min=min_price_change, limit=limit * 2 # Get more to filter ) if not volume_breakouts: return [] # Apply RSI filter filtered_results = [] for coin in volume_breakouts: rsi = coin["indicators"].get("RSI", 50) if rsi_range == "oversold" and rsi >= 30: continue elif rsi_range == "overbought" and rsi <= 70: continue elif rsi_range == "neutral" and (rsi <= 30 or rsi >= 70): continue # "any" passes all # Add trading recommendation recommendation = "" if coin["changePercent"] > 0 and coin["volume_ratio"] >= 2.0: if rsi < 70: recommendation = "🚀 STRONG BUY" else: recommendation = "⚠️ OVERBOUGHT - CAUTION" elif coin["changePercent"] < 0 and coin["volume_ratio"] >= 2.0: if rsi > 30: recommendation = "📉 STRONG SELL" else: recommendation = "🛒 OVERSOLD - OPPORTUNITY?" coin["trading_recommendation"] = recommendation filtered_results.append(coin) return filtered_results[:limit] if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/patch-ridermg48/tradingview-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server