Skip to main content
Glama

IBKR TWS MCP Server

by haymant
DetailedImplementationandTestingPlanforIBKRTWSMCPServerE2ECase.md21.9 kB
# Detailed Implementation and Testing Plan for IBKR TWS MCP Server E2E Case This document provides a comprehensive, step-by-step plan for implementing and testing the IBKR TWS MCP server to support the portfolio rebalancing e2e case described in `Pasted_content_02.txt`. The plan is structured into phases, each with specific tasks, deliverables, and acceptance criteria. ## Overview The goal is to build a production-ready MCP server that exposes IBKR TWS API functionalities as MCP tools, enabling automated portfolio rebalancing workflows. The implementation will use the official `modelcontextprotocol/python-sdk`, `ib_insync` for TWS API interaction, and `uv` for Python environment management. The server will support HTTP streaming (SSE), be containerized with Docker, and include comprehensive unit and integration tests. ## Phase 1: Project Setup and Environment Configuration ### Objectives - Set up the project structure using `uv` - Configure development environment - Install and verify dependencies ### Tasks #### 1.1 Initialize Project with `uv` ```bash # Create project directory mkdir ibkr-tws-mcp-server cd ibkr-tws-mcp-server # Initialize uv project uv init # Add dependencies uv add "mcp[cli]" ``` ib-insync ```bash starlette uvicorn uv add --dev pytest pytest-asyncio pytest-mock ``` #### 1.2 Create Project Structure ``` ibkr-tws-mcp-server/ ├── src/ │ ├── __init__.py │ ├── server.py # Main MCP server implementation │ ├── tws_client.py # TWS client wrapper │ ├── models.py # Pydantic models for requests/responses │ └── config.py # Configuration management ├── tests/ │ ├── __init__.py │ ├── unit/ │ │ ├── __init__.py │ │ ├── test_tws_client.py │ │ └── test_server_tools.py │ ├── integration/ │ │ ├── __init__.py │ │ └── test_e2e_workflow.py │ └── fixtures/ │ ├── sample_positions.json │ ├── sample_historical_data.json │ └── sample_account_summary.json ├── scripts/ │ ├── run_server.sh │ └── test_curl.sh # cURL test scripts ├── docs/ │ ├── API.md # API documentation │ └── SETUP.md # Setup instructions ├── Dockerfile ├── docker-compose.yml ├── .env.example ├── pyproject.toml ├── uv.lock └── README.md ``` #### 1.3 Configure Environment Variables Create `.env.example`: ```bash # TWS/IB Gateway Connection TWS_HOST=127.0.0.1 TWS_PORT=7496 TWS_CLIENT_ID=1 # Server Configuration SERVER_HOST=0.0.0.0 SERVER_PORT=8000 API_PREFIX=/api/v1 # Logging LOG_LEVEL=INFO ``` ### Deliverables - ✅ Project initialized with `uv` - ✅ All dependencies installed - ✅ Project structure created - ✅ Environment configuration files in place ### Acceptance Criteria - `uv run python --version` executes successfully - All directories and files are created as specified - Dependencies are locked in `uv.lock` --- ## Phase 2: Implement TWS Client Wrapper ### Objectives - Create a robust wrapper around `ib_insync` for TWS API interactions - Implement connection management and error handling - Provide clean interfaces for all required TWS API functions ### Tasks #### 2.1 Implement `tws_client.py` **Key Components:** - `TWSClient` class with async connection management - Methods for all required TWS API operations: - `connect()`, `disconnect()`, `is_connected()` - `get_contract_details()` - `get_historical_data()` - `subscribe_market_data()`, `unsubscribe_market_data()` - `get_account_summary()`, `get_positions()` - `subscribe_account_updates()`, `unsubscribe_account_updates()` - `place_order()`, `cancel_order()` - `get_open_orders()`, `get_executions()` - `get_pnl()`, `get_pnl_single()` **Example Implementation Snippet:** ```python from ib_insync import IB, Stock, MarketOrder, util from typing import List, Dict, Any, Optional import asyncio class TWSClient: def __init__(self): self.ib = IB() self._connected = False async def connect(self, host: str, port: int, client_id: int) -> bool: """Connect to TWS/IB Gateway.""" try: await self.ib.connectAsync(host, port, clientId=client_id) self._connected = True return True except Exception as e: raise ConnectionError(f"Failed to connect to TWS: {str(e)}") async def get_positions(self) -> List[Dict[str, Any]]: """Get current portfolio positions.""" if not self._connected: raise RuntimeError("Not connected to TWS") positions = self.ib.positions() return [ { "account": pos.account, "contract": util.dataclassAsDict(pos.contract), "position": pos.position, "avgCost": pos.avgCost, } for pos in positions ] # ... implement other methods ``` #### 2.2 Implement `models.py` Define Pydantic models for structured data: ```python from pydantic import BaseModel from typing import Optional, List class ContractRequest(BaseModel): symbol: str secType: str = "STK" exchange: str = "SMART" currency: str = "USD" class HistoricalDataRequest(BaseModel): contract: ContractRequest durationStr: str = "1 Y" barSizeSetting: str = "1 day" whatToShow: str = "TRADES" useRTH: int = 1 class OrderRequest(BaseModel): contract: ContractRequest action: str # BUY or SELL totalQuantity: int orderType: str = "MKT" lmtPrice: Optional[float] = None transmit: bool = True # ... more models ``` ### Deliverables - ✅ `tws_client.py` with all required methods - ✅ `models.py` with Pydantic models - ✅ Error handling and logging implemented ### Acceptance Criteria - All TWS API operations are wrapped in clean async methods - Connection management is robust with proper error handling - Code follows Python best practices and type hints --- ## Phase 3: Implement MCP Server with FastMCP ### Objectives - Create the MCP server using `FastMCP` - Define all MCP tools corresponding to TWS API functionalities - Implement lifespan management for TWS client - Mount the server on `/api/v1` prefix ### Tasks #### 3.1 Implement `server.py` **Key Components:** - Lifespan context manager for TWS client initialization - MCP tools decorated with `@mcp.tool()` - HTTP transport configuration with `/api/v1` prefix **Example Implementation:** ```python from mcp.server.fastmcp import FastMCP, Context from mcp.server.session import ServerSession from contextlib import asynccontextmanager from dataclasses import dataclass from typing import AsyncIterator from starlette.applications import Starlette from starlette.routing import Mount from .tws_client import TWSClient from .models import ContractRequest, HistoricalDataRequest, OrderRequest @dataclass class AppContext: """Application context with TWS client.""" tws: TWSClient @asynccontextmanager async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: """Manage TWS client lifecycle.""" tws = TWSClient() # Note: Connection is handled by connect_tws tool, not here try: yield AppContext(tws=tws) finally: if tws.is_connected(): await tws.disconnect() # Create MCP server mcp = FastMCP("IBKR TWS MCP Server", lifespan=app_lifespan) # Define tools @mcp.tool() async def connect_tws( ctx: Context[ServerSession, AppContext], host: str = "127.0.0.1", port: int = 7496, clientId: int = 1 ) -> dict: """Connect to TWS/IB Gateway.""" tws = ctx.request_context.lifespan_context.tws await tws.connect(host, port, clientId) return {"status": "connected", "host": host, "port": port} @mcp.tool() async def get_positions(ctx: Context[ServerSession, AppContext]) -> list: """Get current portfolio positions.""" tws = ctx.request_context.lifespan_context.tws return await tws.get_positions() @mcp.tool() async def get_historical_data( ctx: Context[ServerSession, AppContext], symbol: str, secType: str = "STK", exchange: str = "SMART", currency: str = "USD", durationStr: str = "1 Y", barSizeSetting: str = "1 day", whatToShow: str = "TRADES" ) -> list: """Get historical market data for a contract.""" tws = ctx.request_context.lifespan_context.tws contract = ContractRequest( symbol=symbol, secType=secType, exchange=exchange, currency=currency ) return await tws.get_historical_data( contract, durationStr, barSizeSetting, whatToShow ) @mcp.tool() async def place_order( ctx: Context[ServerSession, AppContext], symbol: str, action: str, totalQuantity: int, orderType: str = "MKT", lmtPrice: float = None, secType: str = "STK", exchange: str = "SMART", currency: str = "USD" ) -> dict: """Place an order.""" tws = ctx.request_context.lifespan_context.tws order_req = OrderRequest( contract=ContractRequest( symbol=symbol, secType=secType, exchange=exchange, currency=currency ), action=action, totalQuantity=totalQuantity, orderType=orderType, lmtPrice=lmtPrice ) return await tws.place_order(order_req) # ... implement all other tools # Mount on /api/v1 app = Starlette( routes=[ Mount('/api/v1', app=mcp.streamable_http_app()), ] ) ``` #### 3.2 Implement Streaming Tools For streaming data (market data, account updates), use the MCP SDK's capabilities: ```python @mcp.tool() async def stream_market_data( ctx: Context[ServerSession, AppContext], symbol: str, secType: str = "STK" ) -> str: """Stream real-time market data (returns subscription ID).""" tws = ctx.request_context.lifespan_context.tws # Start streaming and send updates via context async def send_updates(): async for data in tws.subscribe_market_data(symbol, secType): await ctx.info(f"Market data: {data}") # Start background task asyncio.create_task(send_updates()) return f"Streaming market data for {symbol}" ``` ### Deliverables - ✅ `server.py` with all MCP tools implemented - ✅ Lifespan management for TWS client - ✅ HTTP transport mounted on `/api/v1` ### Acceptance Criteria - All tools are properly decorated and documented - Tools can access TWS client via context - Server can be started and responds to requests --- ## Phase 4: Implement Unit Tests ### Objectives - Write comprehensive unit tests for all components - Achieve >80% code coverage - Use mocking to isolate components ### Tasks #### 4.1 Create Test Fixtures **`tests/fixtures/sample_positions.json`****:** ```json [ { "account": "DU123456", "contract": { "symbol": "VTI", "secType": "STK", "exchange": "ARCA", "currency": "USD" }, "position": 100, "avgCost": 200.50 }, { "account": "DU123456", "contract": { "symbol": "TLT", "secType": "STK", "exchange": "NASDAQ", "currency": "USD" }, "position": 50, "avgCost": 95.30 } ] ``` #### 4.2 Implement Unit Tests **`tests/unit/test_tws_client.py`****:** ```python import pytest from unittest.mock import AsyncMock, MagicMock, patch from src.tws_client import TWSClient @pytest.mark.asyncio async def test_connect_success(): """Test successful connection to TWS.""" client = TWSClient() with patch.object(client.ib, 'connectAsync', new_callable=AsyncMock) as mock_connect: mock_connect.return_value = None result = await client.connect("127.0.0.1", 7496, 1) assert result is True assert client.is_connected() is True mock_connect.assert_called_once_with("127.0.0.1", 7496, clientId=1) @pytest.mark.asyncio async def test_get_positions(load_fixture): """Test getting positions.""" client = TWSClient() client._connected = True # Load fixture data expected_positions = load_fixture('sample_positions.json') with patch.object(client.ib, 'positions') as mock_positions: # Mock the positions response mock_positions.return_value = [...] # Convert fixture to mock objects positions = await client.get_positions() assert len(positions) == 2 assert positions[0]['contract']['symbol'] == 'VTI' ``` **`tests/unit/test_server_tools.py`****:** ```python import pytest from unittest.mock import AsyncMock, MagicMock from src.server import connect_tws, get_positions @pytest.mark.asyncio async def test_connect_tws_tool(): """Test connect_tws MCP tool.""" # Create mock context mock_ctx = MagicMock() mock_tws = AsyncMock() mock_ctx.request_context.lifespan_context.tws = mock_tws result = await connect_tws(mock_ctx, "127.0.0.1", 7496, 1) assert result['status'] == 'connected' mock_tws.connect.assert_called_once_with("127.0.0.1", 7496, 1) ``` #### 4.3 Run Tests and Generate Coverage Report ```bash # Run all unit tests uv run pytest tests/unit/ -v # Run with coverage uv run pytest tests/unit/ --cov=src --cov-report=html --cov-report=term ``` ### Deliverables - ✅ Unit tests for all TWS client methods - ✅ Unit tests for all MCP tools - ✅ Test fixtures for mock data - ✅ Coverage report >80% ### Acceptance Criteria - All unit tests pass - Code coverage is above 80% - Tests are well-documented and maintainable --- ## Phase 5: Implement Integration Tests (SIT) ### Objectives - Create end-to-end integration tests - Test the complete workflow from the e2e case - Validate with cURL commands ### Tasks #### 5.1 Create Integration Test Suite **`tests/integration/test_e2e_workflow.py`****:** ```python import pytest import asyncio from httpx import AsyncClient BASE_URL = "http://localhost:8000/api/v1" @pytest.mark.asyncio async def test_portfolio_rebalancing_workflow(): """Test the complete portfolio rebalancing workflow.""" async with AsyncClient(base_url=BASE_URL) as client: # Step 1: Connect to TWS response = await client.post("/connect_tws", json={ "host": "127.0.0.1", "port": 7496, "clientId": 1 }) assert response.status_code == 200 assert response.json()['status'] == 'connected' # Step 2: Get historical data for optimization response = await client.post("/get_historical_data", json={ "symbol": "VTI", "durationStr": "1 Y" }) assert response.status_code == 200 historical_data = response.json() assert len(historical_data) > 0 # Step 3: Get current positions response = await client.get("/get_positions") assert response.status_code == 200 positions = response.json() # Step 4: Calculate drift (mock calculation) # In real scenario, this would involve optimization logic # Step 5: Place rebalancing orders response = await client.post("/place_order", json={ "symbol": "VTI", "action": "SELL", "totalQuantity": 10, "orderType": "MKT" }) assert response.status_code == 200 order_result = response.json() assert 'orderId' in order_result # Step 6: Verify execution await asyncio.sleep(2) # Wait for order to process response = await client.get("/get_executions") assert response.status_code == 200 ``` #### 5.2 Create cURL Test Scripts **`scripts/test_curl.sh`****:** ```bash #!/bin/bash BASE_URL="http://localhost:8000/api/v1" echo "=== Test 1: Connect to TWS ===" curl -X POST $BASE_URL/connect_tws \ -H "Content-Type: application/json" \ -d '{"host": "127.0.0.1", "port": 7496, "clientId": 1}' echo -e "\n\n=== Test 2: Get Positions ===" curl $BASE_URL/get_positions echo -e "\n\n=== Test 3: Get Historical Data ===" curl -X POST $BASE_URL/get_historical_data \ -H "Content-Type: application/json" \ -d '{ "symbol": "VTI", "durationStr": "30 D", "barSizeSetting": "1 day" }' echo -e "\n\n=== Test 4: Get Account Summary ===" curl $BASE_URL/get_account_summary echo -e "\n\n=== Test 5: Place Market Order ===" curl -X POST $BASE_URL/place_order \ -H "Content-Type: application/json" \ -d '{ "symbol": "AAPL", "action": "BUY", "totalQuantity": 1, "orderType": "MKT" }' echo -e "\n\n=== Test 6: Get Open Orders ===" curl $BASE_URL/get_open_orders echo -e "\n\n=== Test 7: Stream Market Data (5 seconds) ===" timeout 5 curl -N $BASE_URL/stream_market_data?symbol=SPY echo -e "\n\nAll tests completed!" ``` ### Deliverables - ✅ Integration test suite - ✅ cURL test scripts - ✅ Test documentation ### Acceptance Criteria - Integration tests pass against a paper trading account - cURL scripts execute successfully - All e2e workflow steps are validated --- ## Phase 6: Dockerization and Deployment Setup ### Objectives - Create Docker configuration - Support both local development and containerized deployment - Provide clear setup instructions ### Tasks #### 6.1 Create Dockerfile ``` FROM python:3.11-slim WORKDIR /app # Install uv RUN pip install uv # Copy project files COPY pyproject.toml uv.lock ./ COPY src/ ./src/ # Install dependencies RUN uv sync --frozen # Expose server port EXPOSE 8000 # Run the server CMD ["uv", "run", "uvicorn", "src.server:app", "--host", "0.0.0.0", "--port", "8000"] ``` #### 6.2 Create docker-compose.yml ```yaml version: '3.8' services: mcp-server: build: . ports: - "8000:8000" environment: - TWS_HOST=${TWS_HOST:-host.docker.internal} - TWS_PORT=${TWS_PORT:-7496} - TWS_CLIENT_ID=${TWS_CLIENT_ID:-1} extra_hosts: - "host.docker.internal:host-gateway" restart: unless-stopped ``` #### 6.3 Create Setup Documentation **`docs/SETUP.md`****:** - Prerequisites (TWS/IB Gateway, Python 3.11+, uv) - Local development setup - Docker deployment - Configuration guide - Troubleshooting ### Deliverables - ✅ Dockerfile - ✅ docker-compose.yml - ✅ Setup documentation ### Acceptance Criteria - Server runs successfully in Docker - Local development environment works - Documentation is clear and complete --- ## Phase 7: Documentation and Delivery ### Objectives - Complete all documentation - Create final traceability matrix - Prepare delivery package ### Tasks #### 7.1 Complete API Documentation **`docs/API.md`****:** - List all MCP tools with descriptions - Request/response examples - Error codes and handling - Usage examples for each tool #### 7.2 Update Traceability Matrix Create a comprehensive matrix linking: - E2E workflow steps - TWS API functions - MCP tools - Unit tests - Integration tests #### 7.3 Create README.md Include: - Project overview - Quick start guide - Architecture diagram - Links to detailed documentation - Contributing guidelines ### Deliverables - ✅ Complete API documentation - ✅ Updated traceability matrix - ✅ Comprehensive README - ✅ All code and tests ### Acceptance Criteria - All documentation is complete and accurate - Traceability matrix covers all requirements - README provides clear getting started guide --- ## Testing Strategy Summary ### Unit Tests - **Target Coverage:** >80% - **Focus:** Individual functions and methods - **Tools:** pytest, pytest-asyncio, pytest-mock - **Execution:** `uv run pytest tests/unit/` ### Integration Tests - **Target:** End-to-end workflows - **Focus:** Complete user scenarios - **Tools:** pytest, httpx - **Execution:** `uv run pytest tests/integration/` ### SIT (System Integration Testing) - **Target:** Real TWS paper account - **Focus:** Actual API interactions - **Tools:** cURL, bash scripts - **Execution:** `./scripts/test_curl.sh` --- ## Timeline Estimate | Phase | Duration | Dependencies | | --- | --- | --- | | Phase 1: Project Setup | 0.5 days | None | | Phase 2: TWS Client Wrapper | 2 days | Phase 1 | | Phase 3: MCP Server Implementation | 3 days | Phase 2 | | Phase 4: Unit Tests | 2 days | Phase 3 | | Phase 5: Integration Tests | 2 days | Phase 3 | | Phase 6: Dockerization | 1 day | Phase 3 | | Phase 7: Documentation | 1.5 days | All phases | | **Total** | **12 days** | | --- ## Success Criteria The implementation will be considered complete when: 1. ✅ All MCP tools are implemented and functional 1. ✅ Unit test coverage is >80% 1. ✅ All integration tests pass 1. ✅ cURL test scripts execute successfully against paper account 1. ✅ Docker deployment works 1. ✅ Documentation is complete and accurate 1. ✅ Traceability matrix is comprehensive 1. ✅ E2E portfolio rebalancing workflow can be executed end-to-end --- ## Risk Mitigation ### Risk 1: TWS API Connection Issues - **Mitigation:** Implement robust retry logic and connection monitoring - **Fallback:** Provide detailed error messages and troubleshooting guide ### Risk 2: MCP SDK Compatibility - **Mitigation:** Use latest stable version, follow official examples - **Fallback:** Engage with MCP community for support ### Risk 3: Testing Environment Setup - **Mitigation:** Provide Docker-based testing environment - **Fallback:** Detailed setup guide for local TWS paper account --- ## Next Steps 1. Review and approve this plan 1. Set up development environment 1. Begin Phase 1 implementation 1. Schedule regular progress reviews 1. Iterate based on feedback and testing results

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/haymant/tws-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server