"""SQL query tools for market data analysis using DuckDB."""
from typing import Any
from ..client import SchwabClient
from ..storage import get_storage
from . import history, options
# Schema for load_price_history tool
LOAD_PRICE_HISTORY_SCHEMA = {
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Stock ticker symbol",
},
"period_type": {
"type": "string",
"enum": ["day", "month", "year", "ytd"],
"description": "Type of period (default: year)",
"default": "year",
},
"period": {
"type": "integer",
"description": "Number of periods (default: 1)",
"default": 1,
},
"frequency_type": {
"type": "string",
"enum": ["minute", "daily", "weekly", "monthly"],
"description": "Frequency of data points (default: daily)",
"default": "daily",
},
"frequency": {
"type": "integer",
"description": "Frequency interval (default: 1)",
"default": 1,
},
"start_date": {
"type": "string",
"description": "Start date (YYYY-MM-DD), alternative to period",
},
"end_date": {
"type": "string",
"description": "End date (YYYY-MM-DD)",
},
"extended_hours": {
"type": "boolean",
"description": "Include extended hours data (default: false)",
"default": False,
},
},
"required": ["symbol"],
}
# Schema for load_option_chain tool
LOAD_OPTION_CHAIN_SCHEMA = {
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Underlying stock symbol",
},
"contract_type": {
"type": "string",
"enum": ["CALL", "PUT", "ALL"],
"description": "Type of options to retrieve",
"default": "ALL",
},
"strike_count": {
"type": "integer",
"description": "Number of strikes above and below ATM (default: all strikes)",
},
"from_date": {
"type": "string",
"description": "Start date for expirations (YYYY-MM-DD)",
},
"to_date": {
"type": "string",
"description": "End date for expirations (YYYY-MM-DD)",
},
},
"required": ["symbol"],
}
# Schema for query_market_data tool
QUERY_MARKET_DATA_SCHEMA = {
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": (
"SQL SELECT query to run against loaded market data. "
"Available tables: 'price_history' (OHLCV data) and 'options' (option chain with Greeks). "
"Only SELECT queries are allowed."
),
},
},
"required": ["sql"],
}
# Schema for get_data_schema tool
GET_DATA_SCHEMA_SCHEMA = {
"type": "object",
"properties": {},
"required": [],
}
async def load_price_history(client: SchwabClient, args: dict) -> dict[str, Any]:
"""
Fetch price history and load it into DuckDB for SQL querying.
Skips fetch if valid cached data exists (1 hour TTL).
Args:
client: SchwabClient instance
args: Tool arguments with symbol and optional date/period params
Returns:
Dict with load status and record count
"""
symbol = args.get("symbol", "").upper()
storage = get_storage()
# Check if we have valid cached data
cache_status = storage.is_cache_valid("price_history", symbol)
if cache_status.get("valid"):
return {
"success": True,
"symbol": symbol,
"cached": True,
"records_loaded": cache_status["record_count"],
"ttl_remaining_seconds": cache_status["ttl_remaining_seconds"],
"loaded_at": cache_status["loaded_at"],
"expires_at": cache_status["expires_at"],
"message": f"Using cached data for {symbol} ({cache_status['record_count']} candles, expires in {cache_status['ttl_remaining_seconds']}s). Use query_market_data to analyze.",
}
# Fetch fresh data
result = await history.get_price_history(client, args)
# Store in DuckDB
record_count = storage.store_price_history(
symbol=result["symbol"],
candles=result["candles"],
params={
"period_type": result.get("period_type"),
"period": result.get("period"),
"frequency_type": result.get("frequency_type"),
"frequency": result.get("frequency"),
},
)
return {
"success": True,
"symbol": result["symbol"],
"cached": False,
"records_loaded": record_count,
"date_range": {
"start": result["candles"][0]["datetime"] if result["candles"] else None,
"end": result["candles"][-1]["datetime"] if result["candles"] else None,
},
"message": f"Loaded {record_count} candles for {result['symbol']}. Use query_market_data to analyze.",
}
async def load_option_chain(client: SchwabClient, args: dict) -> dict[str, Any]:
"""
Fetch option chain and load it into DuckDB for SQL querying.
Skips fetch if valid cached data exists (1 hour TTL).
Args:
client: SchwabClient instance
args: Tool arguments with symbol and optional filters
Returns:
Dict with load status and record count
"""
symbol = args.get("symbol", "").upper()
storage = get_storage()
# Check if we have valid cached data
cache_status = storage.is_cache_valid("options", symbol)
if cache_status.get("valid"):
return {
"success": True,
"symbol": symbol,
"cached": True,
"records_loaded": cache_status["record_count"],
"ttl_remaining_seconds": cache_status["ttl_remaining_seconds"],
"loaded_at": cache_status["loaded_at"],
"expires_at": cache_status["expires_at"],
"message": f"Using cached data for {symbol} ({cache_status['record_count']} contracts, expires in {cache_status['ttl_remaining_seconds']}s). Use query_market_data to analyze.",
}
# Fetch fresh data
result = await options.get_option_chain(client, args)
# Store in DuckDB
record_count = storage.store_options(
underlying_symbol=result["symbol"],
underlying_price=result.get("underlying_price"),
calls=result.get("calls", []),
puts=result.get("puts", []),
params={
"contract_type": args.get("contract_type", "ALL"),
"strike_count": args.get("strike_count"),
"from_date": args.get("from_date"),
"to_date": args.get("to_date"),
},
)
return {
"success": True,
"symbol": result["symbol"],
"cached": False,
"underlying_price": result.get("underlying_price"),
"records_loaded": record_count,
"calls_count": len(result.get("calls", [])),
"puts_count": len(result.get("puts", [])),
"message": f"Loaded {record_count} option contracts for {result['symbol']}. Use query_market_data to analyze.",
}
async def query_market_data(args: dict) -> dict[str, Any]:
"""
Run SQL query against loaded market data.
Args:
args: Dict with 'sql' key containing the SQL query
Returns:
Query results with columns and data
"""
sql = args.get("sql", "")
if not sql:
return {
"success": False,
"error": "No SQL query provided",
"columns": [],
"row_count": 0,
"data": [],
}
storage = get_storage()
try:
result = storage.query(sql)
except ValueError as e:
# Security validation errors
result = {
"success": False,
"error": str(e),
"columns": [],
"row_count": 0,
"data": [],
}
# Add available data info if query failed or returned no results
if not result.get("success") or result.get("row_count", 0) == 0:
result["available_data"] = storage.get_available_data()
return result
async def get_data_schema(args: dict) -> dict[str, Any]:
"""
Get schema information about available tables and example queries.
Args:
args: Unused (no parameters required)
Returns:
Schema information with table definitions and example queries
"""
storage = get_storage()
schema_info = storage.get_schema_info()
available_data = storage.get_available_data()
return {
**schema_info,
"loaded_data": available_data,
}