We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
Enhanced stock data provider with SQLAlchemy integration and screening recommendations.
Provides comprehensive stock data retrieval with database caching and maverick screening.
"""
# Suppress specific pyright warnings for pandas operations
# pyright: reportOperatorIssue=false
import logging
from datetime import UTC, datetime, timedelta
import pandas as pd
import pandas_market_calendars as mcal
import pytz
import yfinance as yf
from dotenv import load_dotenv
from sqlalchemy import text
from sqlalchemy.orm import Session
from maverick_mcp.data.models import (
MaverickBearStocks,
MaverickStocks,
PriceCache,
SessionLocal,
Stock,
SupplyDemandBreakoutStocks,
bulk_insert_price_data,
get_latest_maverick_screening,
)
from maverick_mcp.data.session_management import get_db_session_read_only
from maverick_mcp.utils.circuit_breaker_decorators import (
with_stock_data_circuit_breaker,
)
from maverick_mcp.utils.yfinance_pool import get_yfinance_pool
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("maverick_mcp.stock_data")
class EnhancedStockDataProvider:
"""
Enhanced provider for stock data with database caching and screening recommendations.
"""
def __init__(self, db_session: Session | None = None):
"""
Initialize the stock data provider.
Args:
db_session: Optional database session for dependency injection.
If not provided, will get sessions as needed.
"""
self.timeout = 30
self.max_retries = 3
self.cache_days = 1 # Cache data for 1 day by default
# Initialize NYSE calendar for US stock market
self.market_calendar = mcal.get_calendar("NYSE")
self._db_session = db_session
# Initialize yfinance connection pool
self._yf_pool = get_yfinance_pool()
if db_session:
# Test the provided session
self._test_db_connection_with_session(db_session)
else:
# Test creating a new session
self._test_db_connection()
def _test_db_connection(self):
"""Test database connection on initialization."""
try:
# Use read-only context manager for automatic session management
with get_db_session_read_only() as session:
# Try a simple query
result = session.execute(text("SELECT 1"))
result.fetchone()
logger.info("Database connection successful")
except Exception as e:
logger.warning(
f"Database connection test failed: {e}. Caching will be disabled."
)
def _test_db_connection_with_session(self, session: Session):
"""Test provided database session."""
try:
# Try a simple query
result = session.execute(text("SELECT 1"))
result.fetchone()
logger.info("Database session test successful")
except Exception as e:
logger.warning(
f"Database session test failed: {e}. Caching may not work properly."
)
def _get_data_with_smart_cache(
self, symbol: str, start_date: str, end_date: str, interval: str
) -> pd.DataFrame:
"""
Get stock data using smart caching strategy.
This method:
1. Gets all available data from cache
2. Identifies missing date ranges
3. Fetches only missing data from yfinance
4. Combines and returns the complete dataset
Args:
symbol: Stock ticker symbol
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
interval: Data interval (only '1d' is cached)
Returns:
DataFrame with complete stock data
"""
symbol = symbol.upper()
session, should_close = self._get_db_session()
try:
# Step 1: Get ALL available cached data for the date range
logger.info(f"Checking cache for {symbol} from {start_date} to {end_date}")
cached_df = self._get_cached_data_flexible(
session, symbol, start_date, end_date
)
# Convert dates for comparison - ensure timezone-naive for consistency
start_dt = pd.to_datetime(start_date).tz_localize(None)
end_dt = pd.to_datetime(end_date).tz_localize(None)
# Step 2: Determine what data we need
if cached_df is not None and not cached_df.empty:
logger.info(f"Found {len(cached_df)} cached records for {symbol}")
# Check if we have all the data we need - ensure timezone-naive for comparison
cached_start = pd.to_datetime(cached_df.index.min()).tz_localize(None)
cached_end = pd.to_datetime(cached_df.index.max()).tz_localize(None)
# Identify missing ranges
missing_ranges = []
# Missing data at the beginning?
if start_dt < cached_start:
# Get trading days in the missing range
missing_start_trading = self._get_trading_days(
start_dt, cached_start - timedelta(days=1)
)
if len(missing_start_trading) > 0:
# Only request data if there are trading days
missing_ranges.append(
(
missing_start_trading[0].strftime("%Y-%m-%d"),
missing_start_trading[-1].strftime("%Y-%m-%d"),
)
)
# Missing recent data?
if end_dt > cached_end:
# Check if there are any trading days after our cached data
if self._is_trading_day_between(cached_end, end_dt):
# Get the actual trading days we need
missing_end_trading = self._get_trading_days(
cached_end + timedelta(days=1), end_dt
)
if len(missing_end_trading) > 0:
missing_ranges.append(
(
missing_end_trading[0].strftime("%Y-%m-%d"),
missing_end_trading[-1].strftime("%Y-%m-%d"),
)
)
# If no missing data, return cached data
if not missing_ranges:
logger.info(
f"Cache hit! Returning {len(cached_df)} cached records for {symbol}"
)
# Filter to requested range - ensure index is timezone-naive
cached_df.index = pd.to_datetime(cached_df.index).tz_localize(None)
mask = (cached_df.index >= start_dt) & (cached_df.index <= end_dt)
return cached_df.loc[mask]
# Step 3: Fetch only missing data
logger.info(f"Cache partial hit. Missing ranges: {missing_ranges}")
all_dfs = [cached_df]
for miss_start, miss_end in missing_ranges:
logger.info(
f"Fetching missing data for {symbol} from {miss_start} to {miss_end}"
)
missing_df = self._fetch_stock_data_from_yfinance(
symbol, miss_start, miss_end, None, interval
)
if not missing_df.empty:
all_dfs.append(missing_df)
# Cache the new data
self._cache_price_data(session, symbol, missing_df)
# Combine all data
combined_df = pd.concat(all_dfs).sort_index()
# Remove any duplicates (keep first)
combined_df = combined_df[~combined_df.index.duplicated(keep="first")]
# Filter to requested range - ensure index is timezone-naive
combined_df.index = pd.to_datetime(combined_df.index).tz_localize(None)
mask = (combined_df.index >= start_dt) & (combined_df.index <= end_dt)
return combined_df.loc[mask]
else:
# No cached data, fetch everything but only for trading days
logger.info(
f"No cached data found for {symbol}, fetching from yfinance"
)
# Adjust dates to trading days
trading_days = self._get_trading_days(start_date, end_date)
if len(trading_days) == 0:
logger.warning(
f"No trading days found between {start_date} and {end_date}"
)
return pd.DataFrame(
columns=[ # type: ignore[arg-type]
"Open",
"High",
"Low",
"Close",
"Volume",
"Dividends",
"Stock Splits",
]
)
# Fetch data only for the trading day range
fetch_start = trading_days[0].strftime("%Y-%m-%d")
fetch_end = trading_days[-1].strftime("%Y-%m-%d")
logger.info(
f"Fetching data for trading days: {fetch_start} to {fetch_end}"
)
df = self._fetch_stock_data_from_yfinance(
symbol, fetch_start, fetch_end, None, interval
)
if not df.empty:
# Ensure stock exists and cache the data
self._get_or_create_stock(session, symbol)
self._cache_price_data(session, symbol, df)
return df
finally:
if should_close:
session.close()
def _get_cached_data_flexible(
self, session: Session, symbol: str, start_date: str, end_date: str
) -> pd.DataFrame | None:
"""
Get cached data with flexible date range.
Unlike the strict version, this returns whatever cached data exists
within the requested range, even if incomplete.
Args:
session: Database session
symbol: Stock ticker symbol (will be uppercased)
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
Returns:
DataFrame with available cached data or None
"""
try:
# Get whatever data exists in the range
df = PriceCache.get_price_data(session, symbol, start_date, end_date)
if df.empty:
return None
# Add expected columns for compatibility
for col in ["Dividends", "Stock Splits"]:
if col not in df.columns:
df[col] = 0.0
# Ensure column names match yfinance format
column_mapping = {
"open": "Open",
"high": "High",
"low": "Low",
"close": "Close",
"volume": "Volume",
}
df.rename(columns=column_mapping, inplace=True)
# Ensure proper data types to match yfinance
# Convert Decimal to float for price columns
for col in ["Open", "High", "Low", "Close"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce").astype("float64")
# Convert volume to int
if "Volume" in df.columns:
df["Volume"] = (
pd.to_numeric(df["Volume"], errors="coerce")
.fillna(0)
.astype("int64")
)
# Ensure index is timezone-naive for consistency
df.index = pd.to_datetime(df.index).tz_localize(None)
return df
except Exception as e:
logger.error(f"Error getting flexible cached data: {e}")
return None
def _is_trading_day_between(
self, start_date: pd.Timestamp, end_date: pd.Timestamp
) -> bool:
"""
Check if there's a trading day between two dates using market calendar.
Args:
start_date: Start date
end_date: End date
Returns:
True if there's a trading day between the dates
"""
# Add one day to start since we're checking "between"
check_start = start_date + timedelta(days=1)
if check_start > end_date:
return False
# Get trading days in the range
trading_days = self._get_trading_days(check_start, end_date)
return len(trading_days) > 0
def _get_trading_days(self, start_date, end_date) -> pd.DatetimeIndex:
"""
Get all trading days between start and end dates.
Args:
start_date: Start date (can be string or datetime)
end_date: End date (can be string or datetime)
Returns:
DatetimeIndex of trading days (timezone-naive)
"""
# Ensure dates are datetime objects (timezone-naive)
if isinstance(start_date, str):
start_date = pd.to_datetime(start_date).tz_localize(None)
else:
start_date = pd.to_datetime(start_date).tz_localize(None)
if isinstance(end_date, str):
end_date = pd.to_datetime(end_date).tz_localize(None)
else:
end_date = pd.to_datetime(end_date).tz_localize(None)
# Get valid trading days from market calendar
schedule = self.market_calendar.schedule(
start_date=start_date, end_date=end_date
)
# Return timezone-naive index
return schedule.index.tz_localize(None)
def _get_last_trading_day(self, date) -> pd.Timestamp:
"""
Get the last trading day on or before the given date.
Args:
date: Date to check (can be string or datetime)
Returns:
Last trading day as pd.Timestamp
"""
if isinstance(date, str):
date = pd.to_datetime(date)
# Check if the date itself is a trading day
if self._is_trading_day(date):
return date
# Otherwise, find the previous trading day
for i in range(1, 10): # Look back up to 10 days
check_date = date - timedelta(days=i)
if self._is_trading_day(check_date):
return check_date
# Fallback to the date itself if no trading day found
return date
def _is_trading_day(self, date) -> bool:
"""
Check if a specific date is a trading day.
Args:
date: Date to check
Returns:
True if it's a trading day
"""
if isinstance(date, str):
date = pd.to_datetime(date)
schedule = self.market_calendar.schedule(start_date=date, end_date=date)
return len(schedule) > 0
def _get_db_session(self) -> tuple[Session, bool]:
"""
Get a database session.
Returns:
Tuple of (session, should_close) where should_close indicates
whether the caller should close the session.
"""
# Use injected session if available - should NOT be closed
if self._db_session:
return self._db_session, False
# Otherwise, create a new session using session factory - should be closed
try:
session = SessionLocal()
return session, True
except Exception as e:
logger.error(f"Failed to get database session: {e}", exc_info=True)
raise
def _get_or_create_stock(self, session: Session, symbol: str) -> Stock:
"""
Get or create a stock in the database.
Args:
session: Database session
symbol: Stock ticker symbol
Returns:
Stock object
"""
stock = Stock.get_or_create(session, symbol)
# Try to update stock info if it's missing
company_name = getattr(stock, "company_name", None)
if company_name is None or company_name == "":
try:
# Use connection pool for info retrieval
info = self._yf_pool.get_info(symbol)
stock.company_name = info.get("longName", info.get("shortName"))
stock.sector = info.get("sector")
stock.industry = info.get("industry")
stock.exchange = info.get("exchange")
stock.currency = info.get("currency", "USD")
stock.country = info.get("country")
session.commit()
except Exception as e:
logger.warning(f"Could not update stock info for {symbol}: {e}")
session.rollback()
return stock
def _get_cached_price_data(
self, session: Session, symbol: str, start_date: str, end_date: str
) -> pd.DataFrame | None:
"""
DEPRECATED: Use _get_data_with_smart_cache instead.
This method is kept for backward compatibility but is no longer used
in the main flow. The new smart caching approach provides better
database prioritization.
"""
logger.warning("Using deprecated _get_cached_price_data method")
return self._get_cached_data_flexible(
session, symbol.upper(), start_date, end_date
)
def _cache_price_data(
self, session: Session, symbol: str, df: pd.DataFrame
) -> None:
"""
Cache price data in the database.
Args:
session: Database session
symbol: Stock ticker symbol
df: DataFrame with price data
"""
try:
if df.empty:
return
# Ensure symbol is uppercase to match database
symbol = symbol.upper()
# Prepare DataFrame for caching
cache_df = df.copy()
# Ensure proper column names
column_mapping = {
"Open": "open",
"High": "high",
"Low": "low",
"Close": "close",
"Volume": "volume",
}
cache_df.rename(columns=column_mapping, inplace=True)
# Log DataFrame info for debugging
logger.debug(
f"DataFrame columns before caching: {cache_df.columns.tolist()}"
)
logger.debug(f"DataFrame shape: {cache_df.shape}")
logger.debug(f"DataFrame index type: {type(cache_df.index)}")
if not cache_df.empty:
logger.debug(f"Sample row: {cache_df.iloc[0].to_dict()}")
# Insert data
count = bulk_insert_price_data(session, symbol, cache_df)
if count == 0:
logger.info(
f"No new records cached for {symbol} (data may already exist)"
)
else:
logger.info(f"Cached {count} new price records for {symbol}")
except Exception as e:
logger.error(f"Error caching price data for {symbol}: {e}", exc_info=True)
session.rollback()
def get_stock_data(
self,
symbol: str,
start_date: str | None = None,
end_date: str | None = None,
period: str | None = None,
interval: str = "1d",
use_cache: bool = True,
) -> pd.DataFrame:
"""
Fetch stock data with database caching support.
This method prioritizes cached data from the database and only fetches
missing data from yfinance when necessary.
Args:
symbol: Stock ticker symbol
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
period: Alternative to start/end dates (e.g., '1d', '5d', '1mo', '3mo', '1y', etc.)
interval: Data interval ('1d', '1wk', '1mo', '1m', '5m', etc.)
use_cache: Whether to use cached data if available
Returns:
DataFrame with stock data
"""
# For non-daily intervals or periods, always fetch fresh data
if interval != "1d" or period:
return self._fetch_stock_data_from_yfinance(
symbol, start_date, end_date, period, interval
)
# Set default dates if not provided
if start_date is None:
start_date = (datetime.now(UTC) - timedelta(days=365)).strftime("%Y-%m-%d")
if end_date is None:
end_date = datetime.now(UTC).strftime("%Y-%m-%d")
# For daily data, adjust end date to last trading day if it's not a trading day
# This prevents unnecessary cache misses on weekends/holidays
if interval == "1d" and use_cache:
end_dt = pd.to_datetime(end_date)
if not self._is_trading_day(end_dt):
last_trading = self._get_last_trading_day(end_dt)
logger.debug(
f"Adjusting end date from {end_date} to last trading day {last_trading.strftime('%Y-%m-%d')}"
)
end_date = last_trading.strftime("%Y-%m-%d")
# If cache is disabled, fetch directly from yfinance
if not use_cache:
logger.info(f"Cache disabled, fetching from yfinance for {symbol}")
return self._fetch_stock_data_from_yfinance(
symbol, start_date, end_date, period, interval
)
# Try a smarter caching approach
try:
return self._get_data_with_smart_cache(
symbol, start_date, end_date, interval
)
except Exception as e:
logger.warning(f"Smart cache failed, falling back to yfinance: {e}")
return self._fetch_stock_data_from_yfinance(
symbol, start_date, end_date, period, interval
)
async def get_stock_data_async(
self,
symbol: str,
start_date: str | None = None,
end_date: str | None = None,
period: str | None = None,
interval: str = "1d",
use_cache: bool = True,
) -> pd.DataFrame:
"""
Async version of get_stock_data for parallel processing.
This method wraps the synchronous get_stock_data method to provide
an async interface for use in parallel backtesting operations.
Args:
symbol: Stock ticker symbol
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
period: Alternative to start/end dates (e.g., '1d', '5d', '1mo', '3mo', '1y', etc.)
interval: Data interval ('1d', '1wk', '1mo', '1m', '5m', etc.)
use_cache: Whether to use cached data if available
Returns:
DataFrame with stock data
"""
import asyncio
import functools
# Run the synchronous method in a thread pool to avoid blocking
loop = asyncio.get_event_loop()
# Use functools.partial to create a callable with all arguments
sync_method = functools.partial(
self.get_stock_data,
symbol=symbol,
start_date=start_date,
end_date=end_date,
period=period,
interval=interval,
use_cache=use_cache,
)
# Execute in thread pool to avoid blocking the event loop
return await loop.run_in_executor(None, sync_method)
@with_stock_data_circuit_breaker(
use_fallback=False
) # Fallback handled at higher level
def _fetch_stock_data_from_yfinance(
self,
symbol: str,
start_date: str | None = None,
end_date: str | None = None,
period: str | None = None,
interval: str = "1d",
) -> pd.DataFrame:
"""
Fetch stock data from yfinance with circuit breaker protection.
Note: Circuit breaker is applied with use_fallback=False because
fallback strategies are handled at the get_stock_data level.
"""
logger.info(
f"Fetching data from yfinance for {symbol} - Start: {start_date}, End: {end_date}, Period: {period}, Interval: {interval}"
)
# Use connection pool for better performance
# The pool handles session management and retries internally
# Use the optimized connection pool
df = self._yf_pool.get_history(
symbol=symbol,
start=start_date,
end=end_date,
period=period,
interval=interval,
)
# Check if dataframe is empty or if required columns are missing
if df.empty:
logger.warning(f"Empty dataframe returned for {symbol}")
return pd.DataFrame(
columns=["Open", "High", "Low", "Close", "Volume"] # type: ignore[arg-type]
)
# Ensure all expected columns exist
for col in ["Open", "High", "Low", "Close", "Volume"]:
if col not in df.columns:
logger.warning(
f"Column {col} missing from data for {symbol}, adding empty column"
)
# Use appropriate default values
if col == "Volume":
df[col] = 0
else:
df[col] = 0.0
df.index.name = "Date"
return df
def get_maverick_recommendations(
self, limit: int = 20, min_score: int | None = None
) -> list[dict]:
"""
Get top Maverick stock recommendations from the database.
Args:
limit: Maximum number of recommendations
min_score: Minimum combined score filter
Returns:
List of stock recommendations with details
"""
session, should_close = self._get_db_session()
try:
# Build query with filtering at database level
query = session.query(MaverickStocks)
# Apply min_score filter in the query if specified
if min_score:
query = query.filter(MaverickStocks.combined_score >= min_score)
# Order by score and limit results
stocks = (
query.order_by(MaverickStocks.combined_score.desc()).limit(limit).all()
)
# Process results with list comprehension for better performance
recommendations = [
{
**stock.to_dict(),
"recommendation_type": "maverick_bullish",
"reason": self._generate_maverick_reason(stock),
}
for stock in stocks
]
return recommendations
except Exception as e:
logger.error(f"Error getting maverick recommendations: {e}")
return []
finally:
if should_close:
session.close()
def get_maverick_bear_recommendations(
self, limit: int = 20, min_score: int | None = None
) -> list[dict]:
"""
Get top Maverick bear stock recommendations from the database.
Args:
limit: Maximum number of recommendations
min_score: Minimum score filter
Returns:
List of bear stock recommendations with details
"""
session, should_close = self._get_db_session()
try:
# Build query with filtering at database level
query = session.query(MaverickBearStocks)
# Apply min_score filter in the query if specified
if min_score:
query = query.filter(MaverickBearStocks.score >= min_score)
# Order by score and limit results
stocks = query.order_by(MaverickBearStocks.score.desc()).limit(limit).all()
# Process results with list comprehension for better performance
recommendations = [
{
**stock.to_dict(),
"recommendation_type": "maverick_bearish",
"reason": self._generate_bear_reason(stock),
}
for stock in stocks
]
return recommendations
except Exception as e:
logger.error(f"Error getting bear recommendations: {e}")
return []
finally:
if should_close:
session.close()
def get_supply_demand_breakout_recommendations(
self, limit: int = 20, min_momentum_score: float | None = None
) -> list[dict]:
"""
Get stocks showing supply/demand breakout patterns from accumulation phases.
Args:
limit: Maximum number of recommendations
min_momentum_score: Minimum momentum score filter
Returns:
List of supply/demand breakout recommendations with market structure analysis
"""
session, should_close = self._get_db_session()
try:
# Build query with all filters at database level
query = session.query(SupplyDemandBreakoutStocks).filter(
# Supply/demand breakout criteria: price above all moving averages (demand zone)
SupplyDemandBreakoutStocks.close_price
> SupplyDemandBreakoutStocks.sma_50,
SupplyDemandBreakoutStocks.close_price
> SupplyDemandBreakoutStocks.sma_150,
SupplyDemandBreakoutStocks.close_price
> SupplyDemandBreakoutStocks.sma_200,
# Moving average alignment indicates accumulation structure
SupplyDemandBreakoutStocks.sma_50 > SupplyDemandBreakoutStocks.sma_150,
SupplyDemandBreakoutStocks.sma_150 > SupplyDemandBreakoutStocks.sma_200,
)
# Apply min_momentum_score filter if specified
if min_momentum_score:
query = query.filter(
SupplyDemandBreakoutStocks.momentum_score >= min_momentum_score
)
# Order by momentum score and limit results
stocks = (
query.order_by(SupplyDemandBreakoutStocks.momentum_score.desc())
.limit(limit)
.all()
)
# Process results with list comprehension for better performance
recommendations = [
{
**stock.to_dict(),
"recommendation_type": "supply_demand_breakout",
"reason": self._generate_supply_demand_reason(stock),
}
for stock in stocks
]
return recommendations
except Exception as e:
logger.error(f"Error getting trending recommendations: {e}")
return []
finally:
if should_close:
session.close()
def get_all_screening_recommendations(self) -> dict[str, list[dict]]:
"""
Get all screening recommendations in one call.
Returns:
Dictionary with all screening types and their recommendations
"""
try:
results = get_latest_maverick_screening()
# Add recommendation reasons
for stock in results.get("maverick_stocks", []):
stock["recommendation_type"] = "maverick_bullish"
stock["reason"] = self._generate_maverick_reason_from_dict(stock)
for stock in results.get("maverick_bear_stocks", []):
stock["recommendation_type"] = "maverick_bearish"
stock["reason"] = self._generate_bear_reason_from_dict(stock)
for stock in results.get("supply_demand_breakouts", []):
stock["recommendation_type"] = "supply_demand_breakout"
stock["reason"] = self._generate_supply_demand_reason_from_dict(stock)
return results
except Exception as e:
logger.error(f"Error getting all screening recommendations: {e}")
return {
"maverick_stocks": [],
"maverick_bear_stocks": [],
"supply_demand_breakouts": [],
}
def _generate_maverick_reason(self, stock: MaverickStocks) -> str:
"""Generate recommendation reason for Maverick stock."""
reasons = []
combined_score = getattr(stock, "combined_score", None)
if combined_score is not None and combined_score >= 90:
reasons.append("Exceptional combined score")
elif combined_score is not None and combined_score >= 80:
reasons.append("Strong combined score")
momentum_score = getattr(stock, "momentum_score", None)
if momentum_score is not None and momentum_score >= 90:
reasons.append("outstanding relative strength")
elif momentum_score is not None and momentum_score >= 80:
reasons.append("strong relative strength")
pat = getattr(stock, "pat", None)
if pat is not None and pat != "":
reasons.append(f"{pat} pattern detected")
consolidation = getattr(stock, "consolidation", None)
if consolidation is not None and consolidation == "yes":
reasons.append("consolidation characteristics")
sqz = getattr(stock, "sqz", None)
if sqz is not None and sqz != "":
reasons.append(f"squeeze indicator: {sqz}")
return (
"Bullish setup with " + ", ".join(reasons)
if reasons
else "Strong technical setup"
)
def _generate_bear_reason(self, stock: MaverickBearStocks) -> str:
"""Generate recommendation reason for bear stock."""
reasons = []
score = getattr(stock, "score", None)
if score is not None and score >= 90:
reasons.append("Exceptional bear score")
elif score is not None and score >= 80:
reasons.append("Strong bear score")
momentum_score = getattr(stock, "momentum_score", None)
if momentum_score is not None and momentum_score <= 30:
reasons.append("weak relative strength")
rsi_14 = getattr(stock, "rsi_14", None)
if rsi_14 is not None and rsi_14 <= 30:
reasons.append("oversold RSI")
atr_contraction = getattr(stock, "atr_contraction", False)
if atr_contraction is True:
reasons.append("ATR contraction")
big_down_vol = getattr(stock, "big_down_vol", False)
if big_down_vol is True:
reasons.append("heavy selling volume")
return (
"Bearish setup with " + ", ".join(reasons)
if reasons
else "Weak technical setup"
)
def _generate_supply_demand_reason(self, stock: SupplyDemandBreakoutStocks) -> str:
"""Generate recommendation reason for supply/demand breakout stock."""
reasons = ["Supply/demand breakout from accumulation"]
momentum_score = getattr(stock, "momentum_score", None)
if momentum_score is not None and momentum_score >= 90:
reasons.append("exceptional relative strength")
elif momentum_score is not None and momentum_score >= 80:
reasons.append("strong relative strength")
reasons.append("price above all major moving averages")
reasons.append("moving averages in proper alignment")
pat = getattr(stock, "pat", None)
if pat is not None and pat != "":
reasons.append(f"{pat} pattern")
return " with ".join(reasons)
def _generate_maverick_reason_from_dict(self, stock: dict) -> str:
"""Generate recommendation reason for Maverick stock from dict."""
reasons = []
score = stock.get("combined_score", 0)
if score >= 90:
reasons.append("Exceptional combined score")
elif score >= 80:
reasons.append("Strong combined score")
momentum = stock.get("momentum_score", 0)
if momentum >= 90:
reasons.append("outstanding relative strength")
elif momentum >= 80:
reasons.append("strong relative strength")
if stock.get("pattern"):
reasons.append(f"{stock['pattern']} pattern detected")
if stock.get("consolidation") == "yes":
reasons.append("consolidation characteristics")
if stock.get("squeeze"):
reasons.append(f"squeeze indicator: {stock['squeeze']}")
return (
"Bullish setup with " + ", ".join(reasons)
if reasons
else "Strong technical setup"
)
def _generate_bear_reason_from_dict(self, stock: dict) -> str:
"""Generate recommendation reason for bear stock from dict."""
reasons = []
score = stock.get("score", 0)
if score >= 90:
reasons.append("Exceptional bear score")
elif score >= 80:
reasons.append("Strong bear score")
momentum = stock.get("momentum_score", 100)
if momentum <= 30:
reasons.append("weak relative strength")
rsi = stock.get("rsi_14")
if rsi and rsi <= 30:
reasons.append("oversold RSI")
if stock.get("atr_contraction"):
reasons.append("ATR contraction")
if stock.get("big_down_vol"):
reasons.append("heavy selling volume")
return (
"Bearish setup with " + ", ".join(reasons)
if reasons
else "Weak technical setup"
)
def _generate_supply_demand_reason_from_dict(self, stock: dict) -> str:
"""Generate recommendation reason for supply/demand breakout stock from dict."""
reasons = ["Supply/demand breakout from accumulation"]
momentum = stock.get("momentum_score", 0)
if momentum >= 90:
reasons.append("exceptional relative strength")
elif momentum >= 80:
reasons.append("strong relative strength")
reasons.append("price above all major moving averages")
reasons.append("moving averages in proper alignment")
if stock.get("pattern"):
reasons.append(f"{stock['pattern']} pattern")
return " with ".join(reasons)
# Keep all original methods for backward compatibility
@with_stock_data_circuit_breaker(use_fallback=False)
def get_stock_info(self, symbol: str) -> dict:
"""Get detailed stock information from yfinance with circuit breaker protection."""
# Use connection pool for better performance
return self._yf_pool.get_info(symbol)
def get_realtime_data(self, symbol):
"""Get the latest real-time data for a symbol using yfinance."""
try:
# Use connection pool for real-time data
data = self._yf_pool.get_history(symbol, period="1d")
if data.empty:
return None
latest = data.iloc[-1]
# Get previous close for change calculation
info = self._yf_pool.get_info(symbol)
prev_close = info.get("previousClose", None)
if prev_close is None:
# Try to get from 2-day history
data_2d = self._yf_pool.get_history(symbol, period="2d")
if len(data_2d) > 1:
prev_close = data_2d.iloc[0]["Close"]
else:
prev_close = latest["Close"]
# Calculate change
price = latest["Close"]
change = price - prev_close
change_percent = (change / prev_close) * 100 if prev_close != 0 else 0
return {
"symbol": symbol,
"price": round(price, 2),
"change": round(change, 2),
"change_percent": round(change_percent, 2),
"volume": int(latest["Volume"]),
"timestamp": data.index[-1],
"timestamp_display": data.index[-1].strftime("%Y-%m-%d %H:%M:%S"),
"is_real_time": False, # yfinance data has some delay
}
except Exception as e:
logger.error(f"Error fetching realtime data for {symbol}: {str(e)}")
return None
def get_all_realtime_data(self, symbols):
"""Get all latest real-time data for multiple symbols."""
results = {}
for symbol in symbols:
data = self.get_realtime_data(symbol)
if data:
results[symbol] = data
return results
def is_market_open(self) -> bool:
"""Check if the US stock market is currently open."""
now = datetime.now(pytz.timezone("US/Eastern"))
# Check if it's a weekday
if now.weekday() >= 5: # 5 and 6 are Saturday and Sunday
return False
# Check if it's between 9:30 AM and 4:00 PM Eastern Time
market_open = now.replace(hour=9, minute=30, second=0, microsecond=0)
market_close = now.replace(hour=16, minute=0, second=0, microsecond=0)
return market_open <= now <= market_close
def get_news(self, symbol: str, limit: int = 10) -> pd.DataFrame:
"""Get news for a stock from yfinance."""
try:
ticker = yf.Ticker(symbol)
news = ticker.news
if not news:
return pd.DataFrame(
columns=[ # type: ignore[arg-type]
"title",
"publisher",
"link",
"providerPublishTime",
"type",
]
)
df = pd.DataFrame(news[:limit])
# Convert timestamp to datetime
if "providerPublishTime" in df.columns:
df["providerPublishTime"] = pd.to_datetime(
df["providerPublishTime"], unit="s"
)
return df
except Exception as e:
logger.error(f"Error fetching news for {symbol}: {str(e)}")
return pd.DataFrame(
columns=["title", "publisher", "link", "providerPublishTime", "type"] # type: ignore[arg-type]
)
def get_earnings(self, symbol: str) -> dict:
"""Get earnings information for a stock."""
try:
ticker = yf.Ticker(symbol)
return {
"earnings": ticker.earnings.to_dict()
if hasattr(ticker, "earnings") and not ticker.earnings.empty
else {},
"earnings_dates": ticker.earnings_dates.to_dict()
if hasattr(ticker, "earnings_dates") and not ticker.earnings_dates.empty
else {},
"earnings_trend": ticker.earnings_trend
if hasattr(ticker, "earnings_trend")
else {},
}
except Exception as e:
logger.error(f"Error fetching earnings for {symbol}: {str(e)}")
return {"earnings": {}, "earnings_dates": {}, "earnings_trend": {}}
def get_recommendations(self, symbol: str) -> pd.DataFrame:
"""Get analyst recommendations for a stock."""
try:
ticker = yf.Ticker(symbol)
recommendations = ticker.recommendations
if recommendations is None or recommendations.empty:
return pd.DataFrame(columns=["firm", "toGrade", "fromGrade", "action"]) # type: ignore[arg-type]
return recommendations
except Exception as e:
logger.error(f"Error fetching recommendations for {symbol}: {str(e)}")
return pd.DataFrame(columns=["firm", "toGrade", "fromGrade", "action"]) # type: ignore[arg-type]
def is_etf(self, symbol: str) -> bool:
"""Check if a given symbol is an ETF."""
try:
stock = yf.Ticker(symbol)
# Check if quoteType exists and is ETF
if "quoteType" in stock.info:
return stock.info["quoteType"].upper() == "ETF" # type: ignore[no-any-return]
# Fallback check for common ETF identifiers
return any(
[
symbol.endswith(("ETF", "FUND")),
symbol
in [
"SPY",
"QQQ",
"IWM",
"DIA",
"XLB",
"XLE",
"XLF",
"XLI",
"XLK",
"XLP",
"XLU",
"XLV",
"XLY",
"XLC",
"XLRE",
"XME",
],
"ETF" in stock.info.get("longName", "").upper(),
]
)
except Exception as e:
logger.error(f"Error checking if {symbol} is ETF: {e}")
return False
# Maintain backward compatibility
StockDataProvider = EnhancedStockDataProvider