"""
FastMCP MCP server entry point, packaged under ibkr_mcp.
"""
import asyncio
import os
from datetime import datetime, timedelta, time as dt_time
from zoneinfo import ZoneInfo
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any, AsyncIterator, Dict, List, Optional
from dotenv import load_dotenv
from ib_async import IB, Contract, Stock, Forex, Future, Option
from loguru import logger
from mcp.server.fastmcp import FastMCP
from starlette.middleware.cors import CORSMiddleware
from ibkr_mcp.core.positions import PositionLive
from ibkr_mcp.services import DataEnrichmentService, PortfolioManager, NewsService, OptionDataService, RiskService
from ibkr_mcp.services.option_chain import OptionChainService
from ibkr_mcp.services.stock_data import StockDataService
from ibkr_mcp.services.strategy_analysis import StrategyAnalysisService
from ibkr_mcp.database.repositories import init_database
load_dotenv()
IBKR_HOST = os.getenv("IBKR_HOST", "127.0.0.1")
IBKR_PORT = int(os.getenv("IBKR_PORT", "4001"))
IBKR_CLIENT_ID = int(os.getenv("IBKR_CLIENT_ID", "0"))
IBKR_ACCOUNT = os.getenv("IBKR_ACCOUNT", "")
OPTION_DATA_DIR = Path(os.getenv("IBKR_MCP_OPTION_DATA_DIR", "optiondata"))
OPTION_HISTORY_DIR = Path(os.getenv("IBKR_MCP_OPTION_HISTORY_DIR", "historydata"))
DEFAULT_MARKET_DATA_TYPE = os.getenv("IBKR_MCP_MARKET_DATA_TYPE", "FROZEN")
class IBKRContext:
"""
Async context manager for IBKR lifecycle.
Implements async context manager protocol (__aenter__ and __aexit__)
for automatic resource initialization and cleanup.
"""
def __init__(self, ib: IB) -> None:
"""
Initialize IBKRContext with synchronous operations only.
Args:
ib: IB connection instance (required)
Initializes:
- portfolio_manager: PortfolioManager instance
- risk_service: RiskService instance
- news_service: NewsService instance
- position_live: PositionLive instance (initialized)
- data_enrichment_service: DataEnrichmentService instance (technical indicators, market state)
- option_data_service: OptionDataService instance (JSON-based, legacy)
- option_chain_service: OptionChainService instance (flattened model)
- stock_data_service: StockDataService instance (flattened stock data)
"""
self.ib = ib
# 1. Initialize external services
self.portfolio_manager = PortfolioManager(self.ib)
self.risk_service = RiskService(self.ib)
self.news_service = NewsService(self.ib)
# 2. Initialize real-time data component
self.position_live = PositionLive(self.ib)
self.position_live.initialize()
# 3. Initialize flattened model services (needed by data_enrichment_service)
self.option_chain_service = OptionChainService(self.ib)
self.stock_data_service = StockDataService(
ib=self.ib,
data_dir=OPTION_DATA_DIR / "stocks",
history_dir=OPTION_HISTORY_DIR,
market_data_type=DEFAULT_MARKET_DATA_TYPE,
)
# 4. Initialize data enrichment service (uses stock_data_service)
self.data_enrichment_service = DataEnrichmentService(
stock_data_service=self.stock_data_service,
)
# 5. Construct OptionDataService with constructor injection
# Pass position_store and enrichment_service directly in constructor
self.option_data_service = OptionDataService(
self.ib,
position_store=self.position_live, # Constructor injection
data_dir=OPTION_DATA_DIR,
history_dir=OPTION_HISTORY_DIR,
default_market_data_type=DEFAULT_MARKET_DATA_TYPE,
enrichment_service=self.data_enrichment_service, # Inject enrichment service
)
# 6. Initialize strategy analysis service
self.strategy_analysis_service = StrategyAnalysisService(
option_data_service=self.option_data_service,
)
# 7. Scheduler state (internal)
self._scheduler_task: Optional[asyncio.Task] = None
self._schedule_times: List[dt_time] = []
print("✅ All services initialized")
async def __aenter__(self) -> 'IBKRContext':
"""
Async resource acquisition.
Returns:
IBKRContext: self
Operations:
1. Database initialization (async)
2. Start unified scheduler (background task)
"""
# 1. Database initialization
try:
await init_database()
print("✅ Database initialized")
except Exception as e:
print(f"⚠️ Database initialization failed: {e}")
# 2. Start unified scheduler
try:
await self.start_scheduler()
print("✅ Scheduler started")
except Exception as e:
print(f"⚠️ Scheduler initialization failed: {e}")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""
Unified resource cleanup in reverse initialization order.
Cleanup order:
1. Stop scheduler (stop event source)
2. Cleanup PositionLive (unsubscribe events)
3. Disconnect IB (final step)
"""
# 1. Stop scheduler
await self.stop_scheduler()
print("Scheduler stopped")
# 2. Cleanup PositionLive
if self.position_live:
self.position_live.cleanup()
print("PositionLive cleaned up")
# 3. Disconnect IB
# self.ib.disconnect()
# print("IB connection closed")
async def start_scheduler(self) -> None:
"""
Start the minimal scheduler.
Schedule times configured via SCHEDULE_TIMES env var.
Format: "HH:MM,HH:MM" (e.g., "07:00,16:00")
Default: "07:00,16:00" (before/after US market)
"""
if self._scheduler_task and not self._scheduler_task.done():
logger.warning("Scheduler is already running")
return
schedule_times_str = os.getenv("SCHEDULE_TIMES", "07:00,16:00")
self._schedule_times = self._parse_schedule_times(schedule_times_str)
logger.info(f"Schedule configured for times: {[t.strftime('%H:%M') for t in self._schedule_times]}")
self._scheduler_task = asyncio.create_task(self._scheduler_loop())
logger.info("Scheduler started")
async def stop_scheduler(self) -> None:
"""Stop scheduler and cancel background task."""
if self._scheduler_task and not self._scheduler_task.done():
self._scheduler_task.cancel()
try:
await self._scheduler_task
except asyncio.CancelledError:
pass
logger.info("Scheduler stopped")
def _parse_schedule_times(self, times_str: str) -> List[dt_time]:
"""
Parse schedule times from string.
Args:
times_str: Comma-separated times in HH:MM format
Returns:
List[dt_time]: Sorted list of schedule times
"""
times = []
for time_str in times_str.split(','):
try:
times.append(datetime.strptime(time_str.strip(), "%H:%M").time())
except ValueError:
logger.warning(f"Invalid schedule time '{time_str}', skipping")
return sorted(times) if times else [datetime.strptime("07:00", "%H:%M").time()]
def _compute_next_run(self, now: datetime) -> datetime:
"""
Compute next run time from configured schedule times.
Args:
now: Current datetime with timezone
Returns:
datetime: Next scheduled run time
"""
candidates = []
for scheduled_time in self._schedule_times:
candidate = now.replace(
hour=scheduled_time.hour,
minute=scheduled_time.minute,
second=0,
microsecond=0,
)
if candidate <= now:
candidate += timedelta(days=1)
candidates.append(candidate)
return min(candidates)
async def _scheduler_loop(self) -> None:
"""
Scheduler main loop.
Execution flow:
1. Calculate next run time
2. Sleep until next run time
3. Execute option data update
4. Repeat
"""
tz = ZoneInfo(os.getenv("SCHEDULE_TIMEZONE", "America/Los_Angeles"))
while True:
now = datetime.now(tz)
next_run = self._compute_next_run(now)
sleep_seconds = max((next_run - now).total_seconds(), 0.0)
logger.info(f"Next run scheduled at {next_run.isoformat()} (sleeping {sleep_seconds:.2f}s)")
await asyncio.sleep(sleep_seconds)
try:
logger.info("Starting scheduled option data update")
await self.option_data_service.run_scheduled_update()
logger.info("Scheduled update completed")
except Exception as e:
logger.error(f"Scheduled update failed: {e}")
@staticmethod
def _create_contract(
symbol: str,
sec_type: str = "STK",
exchange: str = "SMART",
currency: str = "USD",
) -> Contract:
if symbol.isdigit():
return Contract(conId=int(symbol))
if sec_type == "STK":
return Stock(symbol=symbol, exchange=exchange, currency=currency)
if sec_type in ("FOREX", "CASH"):
return Forex(pair=symbol)
if sec_type == "FUT":
return Future(symbol=symbol, exchange=exchange)
if sec_type == "OPT":
# Option expects strike as float, not currency as 3rd arg
return Option(symbol=symbol, exchange=exchange, currency=currency)
return Contract(
symbol=symbol, secType=sec_type, exchange=exchange, currency=currency
)
def _flatten_contracts(self, contracts: list[Any]) -> list[Contract]:
# Recursively flatten nested contract lists and filter out None
result: list[Contract] = []
for c in contracts:
if isinstance(c, Contract):
result.append(c)
elif isinstance(c, list):
result.extend(self._flatten_contracts(c))
return result
@staticmethod
def _format_markdown_list(items: list[str], ordered: bool = False) -> str:
"""Format items as a markdown list."""
if not items:
return ""
if ordered:
return "\n".join(f"{i+1}. {item}" for i, item in enumerate(items))
else:
return "\n".join(f"- {item}" for item in items)
async def get_portfolio(self, account: str = "") -> list[dict]:
"""
Get portfolio for a specific account or the connected account.
Returns a list of portfolio position dictionaries.
"""
return self.portfolio_manager.get_portfolio(account)
async def get_greeks_summary(self, account: str = "") -> Dict[str, Any]:
"""
Load current positions and return a portfolio greeks summary.
This method:
- fetches positions for the given account from IBKR
- computes option greeks per underlying
- aggregates portfolio-wide greeks totals
- computes a simple concentration metric by gross exposure
"""
return await self.risk_service.get_greeks_summary(account)
async def evaluate_portfolio_risk(
self,
account: str = "",
config: Optional[Dict[str, Any]] = None,
config_path: str = "risk.yaml",
) -> Dict[str, Any]:
"""
Load positions, compute greeks and concentration, and evaluate risk limits.
The risk configuration can be provided directly as a dict (via MCP tool
arguments) or loaded from a YAML file. Lookup order is:
- the explicitly provided ``config`` argument, if not None
- the explicitly provided ``config_path`` argument, if not empty
- the IBKR_MCP_RISK_CONFIG environment variable
- a local "risk.yaml" file in the working directory
"""
return await self.risk_service.evaluate_portfolio_risk(
account=account,
config=config,
config_path=config_path,
)
async def generate_playbook_actions(
self,
account: str = "",
config: Optional[Dict[str, Any]] = None,
config_path: str = "risk.yaml",
) -> Dict[str, Any]:
"""
Generate strategy playbook actions based on current positions and risk rules.
This reuses the same risk configuration resolution as evaluate_portfolio_risk
and applies PlaybookEngine to derive human-readable adjustment suggestions.
"""
return await self.risk_service.generate_playbook_actions(
account=account,
config=config,
config_path=config_path,
)
async def get_historical_news(
self,
symbol: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
max_count: int = 10,
exchange: str = "SMART",
currency: str = "USD",
conid: Optional[int] = None,
) -> Dict[str, Any]:
"""
Get historical news for a stock by symbol or conid between two dates.
Mirrors the behaviour of the original news tool, but centralised here.
"""
return await self.news_service.get_historical_news(
symbol=symbol,
start_date=start_date,
end_date=end_date,
max_count=max_count,
exchange=exchange,
currency=currency,
conid=conid,
)
async def get_option_chains(
self,
symbols: Optional[list[str]] = None,
mode: str = "live",
market_data_type: Optional[str] = None,
persist: bool = True,
) -> Dict[str, Any]:
"""
Fetch option chain snapshots either via live IBKR data or local cache.
"""
return await self.option_data_service.fetch_option_chains(
symbols or [],
mode=mode,
market_data_type=market_data_type,
persist=persist,
)
def _get_scheduled_symbols(self) -> List[str]:
"""
Get symbols from config or positions for scheduled updates.
Returns:
List of symbols (uppercase, sorted)
"""
# Try to load from config
try:
import yaml
config_path = Path("config.yaml")
if config_path.exists():
with config_path.open("r", encoding="utf-8") as f:
config = yaml.safe_load(f) or {}
schedule_config = config.get("schedule", {})
symbols = schedule_config.get("symbols", [])
if symbols:
return sorted(set(s.upper() for s in symbols))
except Exception as e:
logger.warning(f"Failed to load config symbols: {e}")
# Fallback: get from positions
try:
positions = self.position_live.positions
symbols = sorted(set(p.underlying for p in positions if p.underlying))
logger.debug(f"Using {len(symbols)} symbols from positions")
return symbols
except Exception as e:
logger.error(f"Failed to get symbols from positions: {e}")
return []
@asynccontextmanager
async def ibkr_lifespan(_server: FastMCP) -> AsyncIterator[IBKRContext]:
"""
Simplified lifecycle manager - handles IB connection only.
Yields:
IBKRContext: Initialized context with all services
Raises:
Exception: If IB connection fails
"""
ib = IB()
try:
# Establish IB connection
await ib.connectAsync(
IBKR_HOST,
IBKR_PORT,
clientId=IBKR_CLIENT_ID,
account=IBKR_ACCOUNT
)
print(f"✅ Connected to IBKR at {IBKR_HOST}:{IBKR_PORT}")
# Use IBKRContext as async context manager
async with IBKRContext(ib=ib) as ctx:
yield ctx
except Exception as e:
print(f"❌ Failed to connect to IBKR: {e}")
raise
finally:
if ib.isConnected():
ib.disconnect()
print("🔚 IBKR lifespan ended")
# Create an MCP server
mcp = FastMCP(
name="ibkr-mcp",
lifespan=ibkr_lifespan,
)
async def run_http_with_cors() -> None:
"""
Run the FastMCP HTTP server with CORS enabled so that browsers can
successfully perform OPTIONS /mcp/ preflight requests.
"""
import uvicorn
# Base Streamable HTTP app from FastMCP
app = mcp.streamable_http_app()
# Allow configuration via environment; default to wildcard for local use
raw_origins = os.getenv("CORS_ALLOW_ORIGINS", "*")
if raw_origins == "*":
allow_origins = ["*"]
else:
allow_origins = [
origin.strip()
for origin in raw_origins.split(",")
if origin.strip()
]
# Attach CORS middleware. This will handle OPTIONS /mcp/ preflight
# and add the appropriate CORS headers to responses.
app.add_middleware(
CORSMiddleware,
allow_origins=allow_origins,
allow_methods=["*"],
allow_headers=["*"],
allow_credentials=False,
)
# HTTP host/port are configured via environment to avoid relying on
# FastMCP settings that may not exist in older mcp versions.
host = os.getenv("HOST", "0.0.0.0")
port = int(os.getenv("PORT", "8050"))
config = uvicorn.Config(
app,
host=host,
port=port,
log_level=mcp.settings.log_level.lower(),
)
server = uvicorn.Server(config)
await server.serve()
async def serve() -> None:
"""Async entry point for running the MCP server over stdio."""
transport = os.getenv("TRANSPORT", "stdio")
if transport == 'http':
# Run the MCP server with streamable HTTP transport
await run_http_with_cors()
elif transport == 'stdio':
# Run the MCP server with stdio transport
await mcp.run_stdio_async()
else:
raise ValueError(f"Unsupported transport: {transport}. Use 'http' or 'stdio'.")