Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
load_market_data.py12.2 kB
#!/usr/bin/env python3 """ Market data loading script for self-contained Maverick-MCP database. This script loads stock and price data from Tiingo API into the self-contained mcp_ prefixed tables, making Maverick-MCP completely independent. Usage: python scripts/load_market_data.py --symbols AAPL,MSFT,GOOGL python scripts/load_market_data.py --file symbols.txt python scripts/load_market_data.py --sp500 # Load S&P 500 stocks """ import argparse import asyncio import logging import os import sys from datetime import datetime, timedelta from pathlib import Path import aiohttp import pandas as pd # Add parent directory to path for imports sys.path.append(str(Path(__file__).parent.parent)) from maverick_mcp.config.database_self_contained import ( SelfContainedDatabaseSession, init_self_contained_database, ) from maverick_mcp.data.models import ( Stock, bulk_insert_price_data, ) # Set up logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger("market_data_loader") class TiingoDataLoader: """Loads market data from Tiingo API into self-contained database.""" def __init__(self, api_token: str | None = None): """ Initialize Tiingo data loader. Args: api_token: Tiingo API token. If None, will use TIINGO_API_TOKEN env var """ self.api_token = api_token or os.getenv("TIINGO_API_TOKEN") if not self.api_token: raise ValueError("Tiingo API token required. Set TIINGO_API_TOKEN env var.") self.base_url = "https://api.tiingo.com/tiingo" self.session: aiohttp.ClientSession | None = None async def __aenter__(self): """Async context manager entry.""" self.session = aiohttp.ClientSession( headers={"Authorization": f"Token {self.api_token}"} ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" if self.session: await self.session.close() async def get_stock_metadata(self, symbol: str) -> dict | None: """ Get stock metadata from Tiingo. Args: symbol: Stock ticker symbol Returns: Stock metadata dict or None if not found """ url = f"{self.base_url}/daily/{symbol}" try: async with self.session.get(url) as response: if response.status == 200: data = await response.json() return data elif response.status == 404: logger.warning(f"Stock {symbol} not found in Tiingo") return None else: logger.error( f"Error fetching metadata for {symbol}: {response.status}" ) return None except Exception as e: logger.error(f"Exception fetching metadata for {symbol}: {e}") return None async def get_price_data( self, symbol: str, start_date: str, end_date: str | None = None ) -> pd.DataFrame | None: """ Get historical price data from Tiingo. Args: symbol: Stock ticker symbol start_date: Start date in YYYY-MM-DD format end_date: End date in YYYY-MM-DD format (default: today) Returns: DataFrame with OHLCV data or None if not found """ if not end_date: end_date = datetime.now().strftime("%Y-%m-%d") url = f"{self.base_url}/daily/{symbol}/prices" params = {"startDate": start_date, "endDate": end_date, "format": "json"} try: async with self.session.get(url, params=params) as response: if response.status == 200: data = await response.json() if not data: return None df = pd.DataFrame(data) # Convert date column and set as index df["date"] = pd.to_datetime(df["date"]).dt.date df.set_index("date", inplace=True) # Rename columns to match our model column_mapping = { "open": "open", "high": "high", "low": "low", "close": "close", "volume": "volume", "adjOpen": "adj_open", "adjHigh": "adj_high", "adjLow": "adj_low", "adjClose": "adj_close", "adjVolume": "adj_volume", } df = df.rename(columns=column_mapping) df["symbol"] = symbol.upper() logger.info(f"Loaded {len(df)} price records for {symbol}") return df elif response.status == 404: logger.warning(f"Price data for {symbol} not found") return None else: logger.error( f"Error fetching prices for {symbol}: {response.status}" ) return None except Exception as e: logger.error(f"Exception fetching prices for {symbol}: {e}") return None async def load_stock_data(self, symbols: list[str]) -> int: """ Load stock metadata and price data for multiple symbols. Args: symbols: List of stock ticker symbols Returns: Number of stocks successfully loaded """ loaded_count = 0 with SelfContainedDatabaseSession() as session: for symbol in symbols: logger.info(f"Loading data for {symbol}...") # Get stock metadata metadata = await self.get_stock_metadata(symbol) if not metadata: continue # Create or update stock record Stock.get_or_create( session, symbol, company_name=metadata.get("name", ""), description=metadata.get("description", ""), exchange=metadata.get("exchangeCode", ""), currency="USD", # Tiingo uses USD ) # Load price data (last 2 years) start_date = (datetime.now() - timedelta(days=730)).strftime("%Y-%m-%d") price_df = await self.get_price_data(symbol, start_date) if price_df is not None and not price_df.empty: # Insert price data records_inserted = bulk_insert_price_data(session, symbol, price_df) logger.info( f"Inserted {records_inserted} price records for {symbol}" ) loaded_count += 1 # Rate limiting - Tiingo allows 2400 requests/hour await asyncio.sleep(1.5) # ~2400 requests/hour limit return loaded_count def get_sp500_symbols() -> list[str]: """Get S&P 500 stock symbols from a predefined list.""" # Top 100 S&P 500 stocks for initial loading return [ "AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", "META", "NVDA", "BRK.B", "UNH", "JNJ", "V", "PG", "JPM", "HD", "CVX", "MA", "PFE", "ABBV", "BAC", "KO", "AVGO", "PEP", "TMO", "COST", "WMT", "DIS", "ABT", "ACN", "NFLX", "ADBE", "CRM", "VZ", "DHR", "INTC", "NKE", "T", "TXN", "BMY", "QCOM", "PM", "UPS", "HON", "ORCL", "WFC", "LOW", "LIN", "AMD", "SBUX", "IBM", "GE", "CAT", "MDT", "BA", "AXP", "GILD", "RTX", "GS", "BLK", "MMM", "CVS", "ISRG", "NOW", "AMT", "SPGI", "PLD", "SYK", "TJX", "MDLZ", "ZTS", "MO", "CB", "CI", "PYPL", "SO", "EL", "DE", "REGN", "CCI", "USB", "BSX", "DUK", "AON", "CSX", "CL", "ITW", "PNC", "FCX", "SCHW", "EMR", "NSC", "GM", "FDX", "MU", "BDX", "TGT", "EOG", "SLB", "ICE", "EQIX", "APD", ] def load_symbols_from_file(file_path: str) -> list[str]: """ Load stock symbols from a text file. Args: file_path: Path to file containing stock symbols (one per line) Returns: List of stock symbols """ symbols = [] try: with open(file_path) as f: for line in f: symbol = line.strip().upper() if symbol and not symbol.startswith("#"): symbols.append(symbol) logger.info(f"Loaded {len(symbols)} symbols from {file_path}") except FileNotFoundError: logger.error(f"Symbol file not found: {file_path}") sys.exit(1) except Exception as e: logger.error(f"Error reading symbol file {file_path}: {e}") sys.exit(1) return symbols async def main(): """Main function to load market data.""" parser = argparse.ArgumentParser( description="Load market data into self-contained database" ) parser.add_argument( "--symbols", type=str, help="Comma-separated list of stock symbols (e.g., AAPL,MSFT,GOOGL)", ) parser.add_argument( "--file", type=str, help="Path to file containing stock symbols (one per line)" ) parser.add_argument( "--sp500", action="store_true", help="Load top 100 S&P 500 stocks" ) parser.add_argument( "--create-tables", action="store_true", help="Create database tables if they don't exist", ) parser.add_argument("--database-url", type=str, help="Override database URL") args = parser.parse_args() # Determine symbols to load symbols = [] if args.symbols: symbols = [s.strip().upper() for s in args.symbols.split(",")] elif args.file: symbols = load_symbols_from_file(args.file) elif args.sp500: symbols = get_sp500_symbols() else: parser.print_help() sys.exit(1) logger.info(f"Will load data for {len(symbols)} symbols") # Initialize self-contained database try: init_self_contained_database( database_url=args.database_url, create_tables=args.create_tables ) logger.info("Self-contained database initialized") except Exception as e: logger.error(f"Database initialization failed: {e}") sys.exit(1) # Load market data try: async with TiingoDataLoader() as loader: loaded_count = await loader.load_stock_data(symbols) logger.info( f"Successfully loaded data for {loaded_count}/{len(symbols)} stocks" ) except Exception as e: logger.error(f"Data loading failed: {e}") sys.exit(1) # Display database stats from maverick_mcp.config.database_self_contained import get_self_contained_db_config db_config = get_self_contained_db_config() stats = db_config.get_database_stats() print("\n📊 Database Statistics:") print(f" Total Records: {stats['total_records']}") for table, count in stats["tables"].items(): print(f" {table}: {count}") print("\n✅ Market data loading completed successfully!") if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server