Skip to main content
Glama
unusual_scanner.py11.3 kB
""" Unusual Activity Scanner (Alpaca Data API) Scans market for unusual trading activity using Alpaca's real-time data. """ import logging from typing import List, Dict, Any, Literal, Optional from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor, as_completed from alpaca.data.historical import StockHistoricalDataClient from alpaca.data.requests import StockBarsRequest, StockLatestQuoteRequest from alpaca.data.timeframe import TimeFrame from config import ALPACA_API_KEY, ALPACA_SECRET_KEY logger = logging.getLogger(__name__) # Cache for market snapshot (5-minute TTL) _CACHE = { "data": None, "timestamp": None, "ttl": 300 # 5 minutes in seconds } # S&P 500 core tickers (top 100 for speed) SP500_CORE = [ "AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "META", "TSLA", "UNH", "XOM", "JN", "JPM", "V", "PG", "MA", "HD", "CVX", "MRK", "ABBV", "PEP", "COST", "AVGO", "KO", "ADBE", "MCD", "CSCO", "ACN", "TMO", "ABT", "LIN", "NFLX", "NKE", "ORCL", "CRM", "DIS", "VZ", "CMCSA", "WMT", "DHR", "INTC", "AMD", "TXN", "QCOM", "PM", "NEE", "UNP", "RTX", "HON", "UPS", "LOW", "SPGI", "INTU", "CAT", "BMY", "BA", "AMGN", "SBUX", "IBM", "GE", "AMAT", "ELV", "DE", "GILD", "PLD", "LMT", "ADI", "ADP", "TJX", "BKNG", "MDLZ", "SYK", "VRTX", "ISRG", "CI", "MMC", "CB", "PGR", "ZTS", "SO", "BLK", "REGN", "NOW", "C", "DUK", "BSX", "SLB", "SCHW", "ETN", "MO", "GS", "TMUS", "EOG", "PNC", "MS", "USB", "LRCX", "HCA", "CME", "ICE", "MMM" ] # Singleton for Alpaca data client _data_client = None def _get_alpaca_client() -> Optional[StockHistoricalDataClient]: """Get or create Alpaca data client singleton.""" global _data_client if _data_client is None: try: if not ALPACA_API_KEY or not ALPACA_SECRET_KEY: logger.error("Alpaca credentials not found in environment") return None _data_client = StockHistoricalDataClient(ALPACA_API_KEY, ALPACA_SECRET_KEY) logger.info("Alpaca data client initialized for market scanner") except Exception as e: logger.error(f"Failed to initialize Alpaca data client: {e}") return None return _data_client def _get_ticker_universe() -> List[str]: """Get universe of tickers to scan.""" return SP500_CORE def _is_cache_valid() -> bool: """Check if cached data is still valid.""" if _CACHE["data"] is None or _CACHE["timestamp"] is None: return False elapsed = (datetime.now() - _CACHE["timestamp"]).total_seconds() return elapsed < _CACHE["ttl"] def _fetch_ticker_data_alpaca(symbol: str) -> Optional[Dict[str, Any]]: """Fetch 5-day data for a single ticker using Alpaca.""" try: data_client = _get_alpaca_client() if not data_client: logger.debug("Alpaca data client not available") return None # Get 5-day bars end_time = datetime.now() start_time = end_time - timedelta(days=7) # Extra days for weekends request = StockBarsRequest( symbol_or_symbols=symbol, timeframe=TimeFrame.Day, start=start_time, end=end_time ) bars = data_client.get_stock_bars(request) if symbol not in bars or len(bars[symbol]) < 2: return None # Convert to list and sort by time bar_list = list(bars[symbol]) bar_list.sort(key=lambda x: x.timestamp) # Get latest and previous day latest = bar_list[-1] previous = bar_list[-2] if len(bar_list) >= 2 else latest # Calculate average volume (exclude today) avg_volume = sum(b.volume for b in bar_list[:-1]) / max(len(bar_list) - 1, 1) # Calculate metrics price_change = ((latest.close - previous.close) / previous.close) * 100 volume_ratio = latest.volume / avg_volume if avg_volume > 0 else 0 return { "symbol": symbol, "price": round(latest.close, 2), "change_pct": round(price_change, 2), "volume": int(latest.volume), "avg_volume": int(avg_volume), "volume_ratio": round(volume_ratio, 2), "high": round(latest.high, 2), "low": round(latest.low, 2), } except Exception as e: logger.debug(f"Error fetching {symbol} from Alpaca: {e}") return None def _fetch_market_snapshot(tickers: List[str], max_workers: int = 10) -> List[Dict]: """Fetch current snapshot for all tickers in parallel using Alpaca.""" # Check cache first if _is_cache_valid(): logger.info("Using cached market snapshot") return _CACHE["data"] logger.info(f"Fetching market snapshot for {len(tickers)} tickers via Alpaca...") results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_ticker = {executor.submit(_fetch_ticker_data_alpaca, ticker): ticker for ticker in tickers} for future in as_completed(future_to_ticker): data = future.result() if data: results.append(data) # Update cache _CACHE["data"] = results _CACHE["timestamp"] = datetime.now() logger.info(f"Fetched data for {len(results)} tickers from Alpaca") return results def screen_big_movers(min_change: float = 8.0, min_volume: int = 1_000_000, limit: int = 15) -> List[Dict]: """ Find stocks with large price moves (±8%+) on significant volume. Args: min_change: Minimum absolute price change percentage min_volume: Minimum volume threshold limit: Maximum results to return Returns: List of dictionaries with ticker data """ tickers = _get_ticker_universe() data = _fetch_market_snapshot(tickers) # Filter for big movers filtered = [ d for d in data if abs(d['change_pct']) >= min_change and d['volume'] >= min_volume ] # Sort by absolute change filtered.sort(key=lambda x: abs(x['change_pct']), reverse=True) filtered = filtered[:limit] # Add summary for item in filtered: item['summary'] = f"{'+' if item['change_pct'] > 0 else ''}{item['change_pct']:.1f}% on {item['volume_ratio']:.1f}x volume" return filtered def screen_volume_spikes(volume_ratio_min: float = 3.0, limit: int = 15) -> List[Dict]: """ Find stocks with unusual volume (3x+ average). Args: volume_ratio_min: Minimum volume ratio (current / average) limit: Maximum results to return Returns: List of dictionaries with ticker data """ tickers = _get_ticker_universe() data = _fetch_market_snapshot(tickers) # Filter for volume spikes filtered = [d for d in data if d['volume_ratio'] >= volume_ratio_min] # Sort by volume ratio filtered.sort(key=lambda x: x['volume_ratio'], reverse=True) filtered = filtered[:limit] # Add summary for item in filtered: item['summary'] = f"{item['volume_ratio']:.1f}x avg volume ({'+' if item['change_pct'] > 0 else ''}{item['change_pct']:.1f}%)" return filtered def screen_reversal_candidates(dip_min: float = -5.0, dip_max: float = -15.0, min_volume_ratio: float = 1.5, limit: int = 15) -> List[Dict]: """ Find stocks down 5-15% with elevated volume (potential bounce candidates). Args: dip_min: Minimum price decline (e.g., -5%) dip_max: Maximum price decline (e.g., -15%) min_volume_ratio: Minimum volume ratio to confirm interest limit: Maximum results to return Returns: List of dictionaries with ticker data """ tickers = _get_ticker_universe() data = _fetch_market_snapshot(tickers) # Filter for dips with volume filtered = [ d for d in data if dip_max <= d['change_pct'] <= dip_min and d['volume_ratio'] >= min_volume_ratio ] # Sort by volume ratio (higher volume = more conviction) filtered.sort(key=lambda x: x['volume_ratio'], reverse=True) filtered = filtered[:limit] # Add summary for item in filtered: item['summary'] = f"Down {abs(item['change_pct']):.1f}% on {item['volume_ratio']:.1f}x volume" return filtered def scan_unusual_activity( criteria: Literal["big_movers", "volume_spikes", "reversal_candidates"] = "big_movers", limit: int = 15, visualize: bool = False ) -> str: """ Universal market scanner for unusual trading activity using Alpaca data. Args: criteria: Screening template to use limit: Maximum results to return visualize: If True, returns bar chart of results Returns: Formatted text summary or base64-encoded chart """ try: data_client = _get_alpaca_client() if not data_client: return "❌ Alpaca data client not initialized. Check API credentials in .env file." logger.info(f"Running unusual activity scan: {criteria}") # Run appropriate scanner if criteria == "big_movers": results = screen_big_movers(limit=limit) title = f"🔥 Big Movers (±8%+ on volume)" elif criteria == "volume_spikes": results = screen_volume_spikes(limit=limit) title = f"📊 Volume Spikes (3x+ avg volume)" elif criteria == "reversal_candidates": results = screen_reversal_candidates(limit=limit) title = f"🎯 Reversal Candidates (Dip + Volume)" else: return f"Unknown criteria: {criteria}" if not results: return f"No unusual activity found matching '{criteria}' criteria" # Format text output output = [f"\n=== {title} ==="] output.append(f"Found {len(results)} matches (via Alpaca Data API)\n") for i, item in enumerate(results, 1): output.append( f"{i}. {item['symbol']:6} ${item['price']:8.2f} | " f"{item['summary']}" ) result_text = "\n".join(output) # Add visualization if requested if visualize and results: try: from tools.visualizer import plot_bar symbols = [r['symbol'] for r in results] changes = [r['change_pct'] for r in results] chart = plot_bar( symbols, changes, title=title, x_label="Symbol", y_label="Price Change (%)", horizontal=False ) result_text += f"\n\n{chart}" except Exception as e: logger.error(f"Error generating visualization: {e}") result_text += f"\n(Visualization error: {str(e)})" return result_text except Exception as e: logger.error(f"Error in unusual activity scan: {e}", exc_info=True) return f"Error scanning market: {str(e)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/N-lia/MonteWalk'

If you have feedback or need assistance with the MCP directory API, please join our Discord server