"""
Schwab MCP Server - Main entry point
Exposes Schwab API data to Claude via Model Context Protocol.
READ-ONLY: No trading functionality.
"""
import asyncio
import json
import logging
import sys
from typing import Any
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool, ToolAnnotations
from .auth import TokenManager
from .client import SchwabClient
from .config import settings
from .tools import account, history, instruments, movers, options, query, quotes
# Configure logging - use default level initially, don't call settings() at import time
# to avoid blocking MCP startup if credentials are missing
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
stream=sys.stderr,
)
logger = logging.getLogger(__name__)
# Global instances (initialized lazily)
_token_manager: TokenManager | None = None
_schwab_client: SchwabClient | None = None
def get_token_manager() -> TokenManager:
"""Get or create TokenManager instance."""
global _token_manager
if _token_manager is None:
cfg = settings()
# Validate credentials are present before creating token manager
cfg.validate_credentials()
_token_manager = TokenManager(
client_id=cfg.schwab_client_id,
client_secret=cfg.schwab_client_secret,
token_path=cfg.schwab_token_path,
)
return _token_manager
def get_schwab_client() -> SchwabClient:
"""Get or create SchwabClient instance."""
global _schwab_client
if _schwab_client is None:
cfg = settings()
_schwab_client = SchwabClient(
token_manager=get_token_manager(),
timeout=cfg.schwab_timeout,
)
return _schwab_client
# Initialize MCP server
server = Server("schwab-mcp")
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List all available tools."""
return [
Tool(
name="get_positions",
description="Get all positions with cost basis, quantity, market value, and gain/loss for an account",
inputSchema=account.GET_POSITIONS_SCHEMA,
),
# NOTE: get_account is intentionally NOT exposed for now
Tool(
name="get_quote",
description="Get real-time quote for a stock symbol including price, bid/ask, volume, and fundamentals",
inputSchema=quotes.GET_QUOTE_SCHEMA,
),
Tool(
name="get_quotes",
description="Get real-time quotes for multiple symbols at once",
inputSchema=quotes.GET_QUOTES_SCHEMA,
),
# NOTE: get_option_chain and get_price_history are intentionally NOT exposed
# as tools because they return large datasets that can max out context windows.
# Use load_option_chain/load_price_history + query_market_data instead.
Tool(
name="get_instruments",
description="Search for instruments by symbol or description, or get fundamental data for a symbol",
inputSchema=instruments.GET_INSTRUMENTS_SCHEMA,
),
Tool(
name="get_movers",
description="Get top movers (gainers or losers) for an index by percent or value change",
inputSchema=movers.GET_MOVERS_SCHEMA,
),
# SQL query tools for large datasets
Tool(
name="load_price_history",
description=(
"Fetch price history and load into SQL database for querying. "
"Use this instead of get_price_history when you need to analyze the data with SQL. "
"After loading, use query_market_data to run SQL queries."
),
inputSchema=query.LOAD_PRICE_HISTORY_SCHEMA,
),
Tool(
name="load_option_chain",
description=(
"Fetch option chain and load into SQL database for querying. "
"Use this instead of get_option_chain when you need to analyze the data with SQL. "
"After loading, use query_market_data to run SQL queries."
),
inputSchema=query.LOAD_OPTION_CHAIN_SCHEMA,
),
Tool(
name="query_market_data",
description=(
"Run SQL query against loaded market data (price_history and options tables). "
"First use load_price_history or load_option_chain to load data, then query it. "
"Only SELECT queries are allowed. "
"Use get_data_schema to see table schemas and example queries."
),
inputSchema=query.QUERY_MARKET_DATA_SCHEMA,
),
Tool(
name="get_data_schema",
description=(
"Get schema information for market data tables including column definitions, "
"available data, and example SQL queries for common analyses like volume profile."
),
inputSchema=query.GET_DATA_SCHEMA_SCHEMA,
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Handle tool invocations."""
logger.info(f"Tool called: {name}")
logger.debug(f"Arguments: {arguments}")
client = get_schwab_client()
handlers: dict[str, Any] = {
"get_positions": lambda args: account.get_positions(client, args),
"get_account": lambda args: account.get_account(client, args),
"get_quote": lambda args: quotes.get_quote(client, args),
"get_quotes": lambda args: quotes.get_quotes(client, args),
"get_option_chain": lambda args: options.get_option_chain(client, args),
"get_price_history": lambda args: history.get_price_history(client, args),
"get_instruments": lambda args: instruments.get_instruments(client, args),
"get_movers": lambda args: movers.get_movers(client, args),
# SQL query tools
"load_price_history": lambda args: query.load_price_history(client, args),
"load_option_chain": lambda args: query.load_option_chain(client, args),
"query_market_data": lambda args: query.query_market_data(args),
"get_data_schema": lambda args: query.get_data_schema(args),
}
handler = handlers.get(name)
if not handler:
raise ValueError(f"Unknown tool: {name}")
try:
result = await handler(arguments)
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except ValueError as e:
# Catch credential/token errors with helpful message
logger.error(f"Tool {name} failed: {e}", exc_info=True)
error_response = {
"error": True,
"error_type": "ConfigurationError",
"message": str(e),
"help": "Run 'python get_token.py' to authenticate with Schwab first.",
}
return [TextContent(type="text", text=json.dumps(error_response, indent=2))]
except Exception as e:
logger.error(f"Tool {name} failed: {e}", exc_info=True)
error_response = {
"error": True,
"error_type": type(e).__name__,
"message": str(e),
}
return [TextContent(type="text", text=json.dumps(error_response, indent=2))]
async def run_server():
"""Run the MCP server."""
logger.info("Starting Schwab MCP Server...")
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options(),
)
def main():
"""Entry point for the server."""
asyncio.run(run_server())
if __name__ == "__main__":
main()