Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
test_models_functional.py27.7 kB
""" Functional tests for SQLAlchemy models that test the actual data operations. These tests verify model functionality by reading from the existing production database without creating any new tables or modifying data. """ import os import uuid from datetime import datetime, timedelta from decimal import Decimal import pytest from sqlalchemy import create_engine, text from sqlalchemy.exc import ProgrammingError from sqlalchemy.orm import sessionmaker from maverick_mcp.data.models import ( DATABASE_URL, MaverickBearStocks, MaverickStocks, PriceCache, Stock, SupplyDemandBreakoutStocks, get_latest_maverick_screening, ) @pytest.fixture(scope="session") def read_only_engine(): """Create a read-only database engine for the existing database.""" # Use the existing database URL from environment or default db_url = os.getenv("POSTGRES_URL", DATABASE_URL) try: # Create engine with read-only intent engine = create_engine(db_url, echo=False) # Test the connection with engine.connect() as conn: conn.execute(text("SELECT 1")) except Exception as e: pytest.skip(f"Database not available: {e}") return yield engine engine.dispose() @pytest.fixture(scope="function") def db_session(read_only_engine): """Create a read-only database session for each test.""" SessionLocal = sessionmaker(bind=read_only_engine) session = SessionLocal() yield session session.rollback() # Rollback any potential changes session.close() class TestStockModelReadOnly: """Test the Stock model functionality with read-only operations.""" def test_query_stocks(self, db_session): """Test querying existing stocks from the database.""" # Query for any existing stocks stocks = db_session.query(Stock).limit(5).all() # If there are stocks in the database, verify their structure for stock in stocks: assert hasattr(stock, "stock_id") assert hasattr(stock, "ticker_symbol") assert hasattr(stock, "created_at") assert hasattr(stock, "updated_at") # Verify timestamps are timezone-aware if stock.created_at: assert stock.created_at.tzinfo is not None if stock.updated_at: assert stock.updated_at.tzinfo is not None def test_query_by_ticker(self, db_session): """Test querying stock by ticker symbol.""" # Try to find a common stock like AAPL stock = db_session.query(Stock).filter_by(ticker_symbol="AAPL").first() if stock: assert stock.ticker_symbol == "AAPL" assert isinstance(stock.stock_id, uuid.UUID) def test_stock_repr(self, db_session): """Test string representation of Stock.""" stock = db_session.query(Stock).first() if stock: repr_str = repr(stock) assert "<Stock(" in repr_str assert "ticker=" in repr_str assert stock.ticker_symbol in repr_str def test_stock_relationships(self, db_session): """Test stock relationships with price caches.""" # Find a stock that has price data stock_with_prices = db_session.query(Stock).join(PriceCache).distinct().first() if stock_with_prices: # Access the relationship price_caches = stock_with_prices.price_caches assert isinstance(price_caches, list) # Verify each price cache belongs to this stock for pc in price_caches[:5]: # Check first 5 assert pc.stock_id == stock_with_prices.stock_id assert pc.stock == stock_with_prices class TestPriceCacheModelReadOnly: """Test the PriceCache model functionality with read-only operations.""" def test_query_price_cache(self, db_session): """Test querying existing price cache entries.""" # Query for any existing price data prices = db_session.query(PriceCache).limit(10).all() # Verify structure of price entries for price in prices: assert hasattr(price, "price_cache_id") assert hasattr(price, "stock_id") assert hasattr(price, "date") assert hasattr(price, "close_price") # Verify data types if price.price_cache_id: assert isinstance(price.price_cache_id, uuid.UUID) if price.close_price: assert isinstance(price.close_price, Decimal) def test_price_cache_repr(self, db_session): """Test string representation of PriceCache.""" price = db_session.query(PriceCache).first() if price: repr_str = repr(price) assert "<PriceCache(" in repr_str assert "stock_id=" in repr_str assert "date=" in repr_str assert "close=" in repr_str def test_get_price_data(self, db_session): """Test retrieving price data as DataFrame for existing tickers.""" # Try to get price data for a common stock # Use a recent date range that might have data end_date = datetime.now().date() start_date = end_date - timedelta(days=30) # Try common tickers for ticker in ["AAPL", "MSFT", "GOOGL"]: df = PriceCache.get_price_data( db_session, ticker, start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"), ) if not df.empty: # Verify DataFrame structure assert df.index.name == "date" assert "open" in df.columns assert "high" in df.columns assert "low" in df.columns assert "close" in df.columns assert "volume" in df.columns assert "symbol" in df.columns assert df["symbol"].iloc[0] == ticker # Verify data types assert df["close"].dtype == float assert df["volume"].dtype == int break def test_stock_relationship(self, db_session): """Test relationship back to Stock.""" # Find a price entry with stock relationship price = db_session.query(PriceCache).join(Stock).first() if price: assert price.stock is not None assert price.stock.stock_id == price.stock_id assert hasattr(price.stock, "ticker_symbol") @pytest.mark.integration class TestMaverickStocksReadOnly: """Test MaverickStocks model functionality with read-only operations.""" def test_query_maverick_stocks(self, db_session): """Test querying existing maverick stock entries.""" try: # Query for any existing maverick stocks mavericks = db_session.query(MaverickStocks).limit(10).all() # Verify structure of maverick entries for maverick in mavericks: assert hasattr(maverick, "id") assert hasattr(maverick, "stock") assert hasattr(maverick, "close") assert hasattr(maverick, "combined_score") assert hasattr(maverick, "momentum_score") except Exception as e: if "does not exist" in str(e): pytest.skip(f"MaverickStocks table not found: {e}") else: raise def test_maverick_repr(self, db_session): """Test string representation of MaverickStocks.""" try: maverick = db_session.query(MaverickStocks).first() if maverick: repr_str = repr(maverick) assert "<MaverickStock(" in repr_str assert "stock=" in repr_str assert "close=" in repr_str assert "score=" in repr_str except ProgrammingError as e: if "does not exist" in str(e): pytest.skip(f"MaverickStocks table not found: {e}") else: raise def test_get_top_stocks(self, db_session): """Test retrieving top maverick stocks.""" try: # Get top stocks from existing data top_stocks = MaverickStocks.get_top_stocks(db_session, limit=20) # Verify results are sorted by combined_score if len(top_stocks) > 1: for i in range(len(top_stocks) - 1): assert ( top_stocks[i].combined_score >= top_stocks[i + 1].combined_score ) # Verify limit is respected assert len(top_stocks) <= 20 except ProgrammingError as e: if "does not exist" in str(e): pytest.skip(f"MaverickStocks table not found: {e}") else: raise def test_maverick_to_dict(self, db_session): """Test converting MaverickStocks to dictionary.""" try: maverick = db_session.query(MaverickStocks).first() if maverick: data = maverick.to_dict() # Verify expected keys expected_keys = [ "stock", "close", "volume", "momentum_score", "adr_pct", "pattern", "squeeze", "consolidation", "entry", "combined_score", "compression_score", "pattern_detected", ] for key in expected_keys: assert key in data # Verify data types assert isinstance(data["stock"], str) assert isinstance(data["combined_score"], int | type(None)) except ProgrammingError as e: if "does not exist" in str(e): pytest.skip(f"MaverickStocks table not found: {e}") else: raise @pytest.mark.integration class TestMaverickBearStocksReadOnly: """Test MaverickBearStocks model functionality with read-only operations.""" def test_query_bear_stocks(self, db_session): """Test querying existing maverick bear stock entries.""" try: # Query for any existing bear stocks bears = db_session.query(MaverickBearStocks).limit(10).all() # Verify structure of bear entries for bear in bears: assert hasattr(bear, "id") assert hasattr(bear, "stock") assert hasattr(bear, "close") assert hasattr(bear, "score") assert hasattr(bear, "momentum_score") assert hasattr(bear, "rsi_14") assert hasattr(bear, "atr_contraction") assert hasattr(bear, "big_down_vol") except Exception as e: if "does not exist" in str(e): pytest.skip(f"MaverickBearStocks table not found: {e}") else: raise def test_bear_repr(self, db_session): """Test string representation of MaverickBearStocks.""" try: bear = db_session.query(MaverickBearStocks).first() if bear: repr_str = repr(bear) assert "<MaverickBearStock(" in repr_str assert "stock=" in repr_str assert "close=" in repr_str assert "score=" in repr_str except ProgrammingError as e: if "does not exist" in str(e): pytest.skip(f"MaverickBearStocks table not found: {e}") else: raise def test_get_top_bear_stocks(self, db_session): """Test retrieving top bear stocks.""" try: # Get top bear stocks from existing data top_bears = MaverickBearStocks.get_top_stocks(db_session, limit=20) # Verify results are sorted by score if len(top_bears) > 1: for i in range(len(top_bears) - 1): assert top_bears[i].score >= top_bears[i + 1].score # Verify limit is respected assert len(top_bears) <= 20 except ProgrammingError as e: if "does not exist" in str(e): pytest.skip(f"MaverickBearStocks table not found: {e}") else: raise def test_bear_to_dict(self, db_session): """Test converting MaverickBearStocks to dictionary.""" try: bear = db_session.query(MaverickBearStocks).first() if bear: data = bear.to_dict() # Verify expected keys expected_keys = [ "stock", "close", "volume", "momentum_score", "rsi_14", "macd", "macd_signal", "macd_histogram", "adr_pct", "atr", "atr_contraction", "avg_vol_30d", "big_down_vol", "score", "squeeze", "consolidation", ] for key in expected_keys: assert key in data # Verify boolean fields assert isinstance(data["atr_contraction"], bool) assert isinstance(data["big_down_vol"], bool) except ProgrammingError as e: if "does not exist" in str(e): pytest.skip(f"MaverickBearStocks table not found: {e}") else: raise @pytest.mark.integration class TestSupplyDemandBreakoutStocksReadOnly: """Test SupplyDemandBreakoutStocks model functionality with read-only operations.""" def test_query_supply_demand_stocks(self, db_session): """Test querying existing supply/demand breakout stock entries.""" try: # Query for any existing supply/demand breakout stocks stocks = db_session.query(SupplyDemandBreakoutStocks).limit(10).all() # Verify structure of supply/demand breakout entries for stock in stocks: assert hasattr(stock, "id") assert hasattr(stock, "stock") assert hasattr(stock, "close") assert hasattr(stock, "momentum_score") assert hasattr(stock, "sma_50") assert hasattr(stock, "sma_150") assert hasattr(stock, "sma_200") except Exception as e: if "does not exist" in str(e): pytest.skip(f"SupplyDemandBreakoutStocks table not found: {e}") else: raise def test_supply_demand_repr(self, db_session): """Test string representation of SupplyDemandBreakoutStocks.""" try: supply_demand = db_session.query(SupplyDemandBreakoutStocks).first() if supply_demand: repr_str = repr(supply_demand) assert "<supply/demand breakoutStock(" in repr_str assert "stock=" in repr_str assert "close=" in repr_str assert "rs=" in repr_str except ProgrammingError as e: if "does not exist" in str(e): pytest.skip(f"SupplyDemandBreakoutStocks table not found: {e}") else: raise def test_get_top_supply_demand_stocks(self, db_session): """Test retrieving top supply/demand breakout stocks.""" try: # Get top stocks from existing data top_stocks = SupplyDemandBreakoutStocks.get_top_stocks(db_session, limit=20) # Verify results are sorted by momentum_score if len(top_stocks) > 1: for i in range(len(top_stocks) - 1): assert ( top_stocks[i].momentum_score >= top_stocks[i + 1].momentum_score ) # Verify limit is respected assert len(top_stocks) <= 20 except ProgrammingError as e: if "does not exist" in str(e): pytest.skip(f"SupplyDemandBreakoutStocks table not found: {e}") else: raise def test_get_stocks_above_moving_averages(self, db_session): """Test retrieving stocks meeting supply/demand breakout criteria.""" try: # Get stocks that meet supply/demand breakout criteria from existing data stocks = SupplyDemandBreakoutStocks.get_stocks_above_moving_averages( db_session ) # Verify all returned stocks meet the criteria for stock in stocks: assert stock.close > stock.sma_50 assert stock.close > stock.sma_150 assert stock.close > stock.sma_200 assert stock.sma_50 > stock.sma_150 assert stock.sma_150 > stock.sma_200 # Verify they're sorted by momentum score if len(stocks) > 1: for i in range(len(stocks) - 1): assert stocks[i].momentum_score >= stocks[i + 1].momentum_score except ProgrammingError as e: if "does not exist" in str(e): pytest.skip(f"SupplyDemandBreakoutStocks table not found: {e}") else: raise def test_supply_demand_to_dict(self, db_session): """Test converting SupplyDemandBreakoutStocks to dictionary.""" try: supply_demand = db_session.query(SupplyDemandBreakoutStocks).first() if supply_demand: data = supply_demand.to_dict() # Verify expected keys expected_keys = [ "stock", "close", "volume", "momentum_score", "adr_pct", "pattern", "squeeze", "consolidation", "entry", "ema_21", "sma_50", "sma_150", "sma_200", "atr", "avg_volume_30d", ] for key in expected_keys: assert key in data # Verify data types assert isinstance(data["stock"], str) assert isinstance(data["momentum_score"], float | int) except ProgrammingError as e: if "does not exist" in str(e): pytest.skip(f"SupplyDemandBreakoutStocks table not found: {e}") else: raise @pytest.mark.integration class TestGetLatestMaverickScreeningReadOnly: """Test the get_latest_maverick_screening function with read-only operations.""" def test_get_latest_screening(self): """Test retrieving latest screening results from existing data.""" try: # Call the function directly - it creates its own session results = get_latest_maverick_screening() # Verify structure assert isinstance(results, dict) assert "maverick_stocks" in results assert "maverick_bear_stocks" in results assert "supply_demand_stocks" in results # Verify each result is a list of dictionaries assert isinstance(results["maverick_stocks"], list) assert isinstance(results["maverick_bear_stocks"], list) assert isinstance(results["supply_demand_stocks"], list) # If there are maverick stocks, verify their structure if results["maverick_stocks"]: stock_dict = results["maverick_stocks"][0] assert isinstance(stock_dict, dict) assert "stock" in stock_dict assert "combined_score" in stock_dict # Verify they're sorted by combined_score scores = [s["combined_score"] for s in results["maverick_stocks"]] assert scores == sorted(scores, reverse=True) # If there are bear stocks, verify their structure if results["maverick_bear_stocks"]: bear_dict = results["maverick_bear_stocks"][0] assert isinstance(bear_dict, dict) assert "stock" in bear_dict assert "score" in bear_dict # Verify they're sorted by score scores = [s["score"] for s in results["maverick_bear_stocks"]] assert scores == sorted(scores, reverse=True) # If there are supply/demand breakout stocks, verify their structure if results["supply_demand_stocks"]: min_dict = results["supply_demand_stocks"][0] assert isinstance(min_dict, dict) assert "stock" in min_dict assert "momentum_score" in min_dict # Verify they're sorted by momentum_score ratings = [s["momentum_score"] for s in results["supply_demand_stocks"]] assert ratings == sorted(ratings, reverse=True) except Exception as e: # If tables don't exist, that's okay for read-only tests if "does not exist" in str(e): pytest.skip(f"Screening tables not found in database: {e}") else: raise class TestDatabaseStructureReadOnly: """Test database structure and relationships with read-only operations.""" def test_stock_ticker_query_performance(self, db_session): """Test that ticker queries work efficiently (index should exist).""" # Query for a specific ticker - should be fast if indexed import time start_time = time.time() # Try to find a stock by ticker stock = db_session.query(Stock).filter_by(ticker_symbol="AAPL").first() query_time = time.time() - start_time # Query should be reasonably fast if properly indexed # Allow up to 1 second for connection overhead assert query_time < 1.0 # If stock exists, verify it has expected fields if stock: assert stock.ticker_symbol == "AAPL" def test_price_cache_date_query_performance(self, db_session): """Test that price cache queries by stock and date are efficient.""" # First find a stock with prices stock_with_prices = db_session.query(Stock).join(PriceCache).first() if stock_with_prices: # Get a recent date recent_price = ( db_session.query(PriceCache) .filter_by(stock_id=stock_with_prices.stock_id) .order_by(PriceCache.date.desc()) .first() ) if recent_price: # Query for specific stock_id and date - should be fast import time start_time = time.time() result = ( db_session.query(PriceCache) .filter_by( stock_id=stock_with_prices.stock_id, date=recent_price.date ) .first() ) query_time = time.time() - start_time # Query should be reasonably fast if composite index exists assert query_time < 1.0 assert result is not None assert result.price_cache_id == recent_price.price_cache_id class TestDataTypesAndConstraintsReadOnly: """Test data types and constraints with read-only operations.""" def test_null_values_in_existing_data(self, db_session): """Test handling of null values in optional fields in existing data.""" # Query stocks that might have null values stocks = db_session.query(Stock).limit(20).all() for stock in stocks: # These fields are optional and can be None assert hasattr(stock, "company_name") assert hasattr(stock, "sector") assert hasattr(stock, "industry") # Verify ticker_symbol is never null (it's required) assert stock.ticker_symbol is not None assert isinstance(stock.ticker_symbol, str) def test_decimal_precision_in_existing_data(self, db_session): """Test decimal precision in existing price data.""" # Query some price data prices = db_session.query(PriceCache).limit(10).all() for price in prices: # Verify decimal fields if price.close_price is not None: assert isinstance(price.close_price, Decimal) # Check precision (should have at most 2 decimal places) str_price = str(price.close_price) if "." in str_price: decimal_places = len(str_price.split(".")[1]) assert decimal_places <= 2 # Same for other price fields for field in ["open_price", "high_price", "low_price"]: value = getattr(price, field) if value is not None: assert isinstance(value, Decimal) def test_volume_data_types(self, db_session): """Test volume data types in existing data.""" # Query price data with volumes prices = ( db_session.query(PriceCache) .filter(PriceCache.volume.isnot(None)) .limit(10) .all() ) for price in prices: assert isinstance(price.volume, int) assert price.volume >= 0 def test_timezone_handling_in_existing_data(self, db_session): """Test that timestamps have timezone info in existing data.""" # Query any model with timestamps stocks = db_session.query(Stock).limit(5).all() # Skip test if no stocks found if not stocks: pytest.skip("No stock data found in database") # Check if data has timezone info (newer data should, legacy data might not) has_tz_info = False for stock in stocks: if stock.created_at and stock.created_at.tzinfo is not None: has_tz_info = True # Data should have timezone info (not necessarily UTC for legacy data) # New data created by the app will be UTC if stock.updated_at and stock.updated_at.tzinfo is not None: has_tz_info = True # Data should have timezone info (not necessarily UTC for legacy data) # This test just verifies that timezone-aware timestamps are being used # Legacy data might not be UTC, but new data will be if has_tz_info: # Pass - data has timezone info which is what we want pass else: pytest.skip( "Legacy data without timezone info - new data will have timezone info" ) def test_relationships_integrity(self, db_session): """Test that relationships maintain referential integrity.""" # Find prices with valid stock relationships prices_with_stocks = db_session.query(PriceCache).join(Stock).limit(10).all() for price in prices_with_stocks: # Verify the relationship is intact assert price.stock is not None assert price.stock.stock_id == price.stock_id # Verify reverse relationship assert price in price.stock.price_caches

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server