#!/usr/bin/env python3
"""
Test script for the fetch_and_update_quotes tool.
Tests quote downloading and updating functionality.
"""
import os
import sys
import sqlite3
from datetime import datetime, timedelta
from dotenv import load_dotenv
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Load environment variables
load_dotenv()
DB_PATH = os.getenv("DB_PATH")
def test_fetch_quotes_for_specific_symbols():
"""Test fetching quotes for specific symbols."""
print("\n" + "="*60)
print("TEST 1: Fetch quotes for specific symbols")
print("="*60)
# Test with a few well-known symbols
test_symbols = ["AAPL", "MSFT", "GOOGL"]
# Calculate dates
end_date = datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
print(f"\nSymbols: {', '.join(test_symbols)}")
print(f"Period: {start_date} to {end_date}")
# Import yfinance
try:
import yfinance as yf
except ImportError:
print("[X] ERROR: yfinance not installed. Run 'uv sync' first.")
return False
# Fetch data
print("\n[FETCH] Fetching data from Yahoo Finance...")
for symbol in test_symbols:
try:
ticker = yf.Ticker(symbol)
hist = ticker.history(start=start_date, end=end_date, interval="1d")
if hist.empty:
print(f" [!] {symbol}: No data available")
else:
print(f" [OK] {symbol}: {len(hist)} quotes retrieved")
print(f" Latest: ${hist['Close'].iloc[-1]:.2f}")
except Exception as e:
print(f" [X] {symbol}: Error - {str(e)[:50]}")
return True
def test_fetch_quotes_from_portfolio():
"""Test fetching quotes for all symbols in the portfolio."""
print("\n" + "="*60)
print("TEST 2: Fetch quotes for portfolio symbols")
print("="*60)
if not DB_PATH or not os.path.exists(DB_PATH):
print(f"[X] ERROR: Database not found at {DB_PATH}")
return False
# Get symbols from database
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("SELECT DISTINCT symbol FROM assets WHERE symbol IS NOT NULL LIMIT 5")
symbols = [row[0] for row in c.fetchall()]
conn.close()
if not symbols:
print("[!] No symbols found in portfolio")
return False
print(f"\nFound {len(symbols)} symbols in portfolio (showing first 5)")
print(f"Symbols: {', '.join(symbols)}")
# Calculate dates
end_date = datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
print(f"Period: {start_date} to {end_date}")
# Import yfinance
try:
import yfinance as yf
except ImportError:
print("[X] ERROR: yfinance not installed. Run 'uv sync' first.")
return False
print("\n[FETCH] Fetching data from Yahoo Finance...")
success_count = 0
for symbol in symbols:
try:
ticker = yf.Ticker(symbol)
hist = ticker.history(start=start_date, end=end_date, interval="1d")
if hist.empty:
print(f" [!] {symbol}: No data available")
else:
print(f" [OK] {symbol}: {len(hist)} quotes retrieved")
success_count += 1
except Exception as e:
print(f" [X] {symbol}: Error - {str(e)[:50]}")
print(f"\n[SUMMARY] {success_count}/{len(symbols)} symbols fetched successfully")
return True
def test_update_database():
"""Test updating the database with fetched quotes."""
print("\n" + "="*60)
print("TEST 3: Update database with quotes")
print("="*60)
print("[!] WARNING: This test will modify the database!")
print("Press Enter to continue, or Ctrl+C to cancel...")
try:
input()
except KeyboardInterrupt:
print("\n[X] Test cancelled by user")
return False
if not DB_PATH or not os.path.exists(DB_PATH):
print(f"[X] ERROR: Database not found at {DB_PATH}")
return False
# Test with AAPL only
test_symbol = "AAPL"
end_date = datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.now() - timedelta(days=3)).strftime("%Y-%m-%d")
print(f"\nSymbol: {test_symbol}")
print(f"Period: {start_date} to {end_date}")
# Import required modules
try:
import yfinance as yf
import uuid
except ImportError:
print("[X] ERROR: yfinance not installed. Run 'uv sync' first.")
return False
# Fetch data
print("\n[FETCH] Fetching data from Yahoo Finance...")
ticker = yf.Ticker(test_symbol)
hist = ticker.history(start=start_date, end=end_date, interval="1d")
if hist.empty:
print(f"[X] No data available for {test_symbol}")
return False
print(f"[OK] Retrieved {len(hist)} quotes")
# Update database
print("\n[DB] Updating database...")
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
quotes_added = 0
quotes_updated = 0
for date, row in hist.iterrows():
quote_id = str(uuid.uuid4())
timestamp = date.strftime("%Y-%m-%dT%H:%M:%S.000Z")
# Check if quote exists
c.execute("""
SELECT id FROM quotes
WHERE symbol = ? AND date(timestamp) = date(?)
""", (test_symbol, timestamp))
if c.fetchone():
# Update
c.execute("""
UPDATE quotes
SET open = ?, high = ?, low = ?, close = ?, volume = ?
WHERE symbol = ? AND date(timestamp) = date(?)
""", (
str(row['Open']), str(row['High']), str(row['Low']),
str(row['Close']), str(int(row['Volume'])),
test_symbol, timestamp
))
quotes_updated += 1
else:
# Insert
c.execute("""
INSERT INTO quotes (id, symbol, timestamp, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
quote_id, test_symbol, timestamp,
str(row['Open']), str(row['High']), str(row['Low']),
str(row['Close']), str(int(row['Volume']))
))
quotes_added += 1
conn.commit()
conn.close()
print(f"[OK] Database updated successfully!")
print(f" - Added: {quotes_added} quotes")
print(f" - Updated: {quotes_updated} quotes")
return True
def main():
"""Run all tests."""
print("\n[TEST] Testing fetch_and_update_quotes functionality")
print("="*60)
# Check if database is configured
if not DB_PATH:
print("[X] ERROR: DB_PATH environment variable not set")
print("Please set it in your .env file or environment")
sys.exit(1)
if not os.path.exists(DB_PATH):
print(f"[X] ERROR: Database not found at {DB_PATH}")
sys.exit(1)
print(f"[OK] Database found at: {DB_PATH}")
# Run tests
tests = [
("Fetch quotes for specific symbols", test_fetch_quotes_for_specific_symbols),
("Fetch quotes from portfolio", test_fetch_quotes_from_portfolio),
("Update database with quotes", test_update_database),
]
results = []
for test_name, test_func in tests:
try:
result = test_func()
results.append((test_name, result))
except Exception as e:
print(f"\n[X] Test failed with exception: {str(e)}")
results.append((test_name, False))
# Summary
print("\n" + "="*60)
print("TEST SUMMARY")
print("="*60)
passed = sum(1 for _, result in results if result)
total = len(results)
for test_name, result in results:
status = "[OK] PASSED" if result else "[X] FAILED"
print(f"{status}: {test_name}")
print(f"\n{passed}/{total} tests passed")
if passed == total:
print("[SUCCESS] All tests passed!")
else:
print("[WARNING] Some tests failed")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n[X] Tests interrupted by user")
sys.exit(1)