DetailedImplementationandTestingPlanforIBKRTWSMCPServerE2ECase.md•21.9 kB
# Detailed Implementation and Testing Plan for IBKR TWS MCP Server E2E Case
This document provides a comprehensive, step-by-step plan for implementing and testing the IBKR TWS MCP server to support the portfolio rebalancing e2e case described in `Pasted_content_02.txt`. The plan is structured into phases, each with specific tasks, deliverables, and acceptance criteria.
## Overview
The goal is to build a production-ready MCP server that exposes IBKR TWS API functionalities as MCP tools, enabling automated portfolio rebalancing workflows. The implementation will use the official `modelcontextprotocol/python-sdk`, `ib_insync` for TWS API interaction, and `uv` for Python environment management. The server will support HTTP streaming (SSE), be containerized with Docker, and include comprehensive unit and integration tests.
## Phase 1: Project Setup and Environment Configuration
### Objectives
- Set up the project structure using `uv`
- Configure development environment
- Install and verify dependencies
### Tasks
#### 1.1 Initialize Project with `uv`
```bash
# Create project directory
mkdir ibkr-tws-mcp-server
cd ibkr-tws-mcp-server
# Initialize uv project
uv init
# Add dependencies
uv add "mcp[cli]"
```
ib-insync
```bash
starlette uvicorn
uv add --dev pytest pytest-asyncio pytest-mock
```
#### 1.2 Create Project Structure
```
ibkr-tws-mcp-server/
├── src/
│ ├── __init__.py
│ ├── server.py # Main MCP server implementation
│ ├── tws_client.py # TWS client wrapper
│ ├── models.py # Pydantic models for requests/responses
│ └── config.py # Configuration management
├── tests/
│ ├── __init__.py
│ ├── unit/
│ │ ├── __init__.py
│ │ ├── test_tws_client.py
│ │ └── test_server_tools.py
│ ├── integration/
│ │ ├── __init__.py
│ │ └── test_e2e_workflow.py
│ └── fixtures/
│ ├── sample_positions.json
│ ├── sample_historical_data.json
│ └── sample_account_summary.json
├── scripts/
│ ├── run_server.sh
│ └── test_curl.sh # cURL test scripts
├── docs/
│ ├── API.md # API documentation
│ └── SETUP.md # Setup instructions
├── Dockerfile
├── docker-compose.yml
├── .env.example
├── pyproject.toml
├── uv.lock
└── README.md
```
#### 1.3 Configure Environment Variables
Create `.env.example`:
```bash
# TWS/IB Gateway Connection
TWS_HOST=127.0.0.1
TWS_PORT=7496
TWS_CLIENT_ID=1
# Server Configuration
SERVER_HOST=0.0.0.0
SERVER_PORT=8000
API_PREFIX=/api/v1
# Logging
LOG_LEVEL=INFO
```
### Deliverables
- ✅ Project initialized with `uv`
- ✅ All dependencies installed
- ✅ Project structure created
- ✅ Environment configuration files in place
### Acceptance Criteria
- `uv run python --version` executes successfully
- All directories and files are created as specified
- Dependencies are locked in `uv.lock`
---
## Phase 2: Implement TWS Client Wrapper
### Objectives
- Create a robust wrapper around `ib_insync` for TWS API interactions
- Implement connection management and error handling
- Provide clean interfaces for all required TWS API functions
### Tasks
#### 2.1 Implement `tws_client.py`
**Key Components:**
- `TWSClient` class with async connection management
- Methods for all required TWS API operations:
- `connect()`, `disconnect()`, `is_connected()`
- `get_contract_details()`
- `get_historical_data()`
- `subscribe_market_data()`, `unsubscribe_market_data()`
- `get_account_summary()`, `get_positions()`
- `subscribe_account_updates()`, `unsubscribe_account_updates()`
- `place_order()`, `cancel_order()`
- `get_open_orders()`, `get_executions()`
- `get_pnl()`, `get_pnl_single()`
**Example Implementation Snippet:**
```python
from ib_insync import IB, Stock, MarketOrder, util
from typing import List, Dict, Any, Optional
import asyncio
class TWSClient:
def __init__(self):
self.ib = IB()
self._connected = False
async def connect(self, host: str, port: int, client_id: int) -> bool:
"""Connect to TWS/IB Gateway."""
try:
await self.ib.connectAsync(host, port, clientId=client_id)
self._connected = True
return True
except Exception as e:
raise ConnectionError(f"Failed to connect to TWS: {str(e)}")
async def get_positions(self) -> List[Dict[str, Any]]:
"""Get current portfolio positions."""
if not self._connected:
raise RuntimeError("Not connected to TWS")
positions = self.ib.positions()
return [
{
"account": pos.account,
"contract": util.dataclassAsDict(pos.contract),
"position": pos.position,
"avgCost": pos.avgCost,
}
for pos in positions
]
# ... implement other methods
```
#### 2.2 Implement `models.py`
Define Pydantic models for structured data:
```python
from pydantic import BaseModel
from typing import Optional, List
class ContractRequest(BaseModel):
symbol: str
secType: str = "STK"
exchange: str = "SMART"
currency: str = "USD"
class HistoricalDataRequest(BaseModel):
contract: ContractRequest
durationStr: str = "1 Y"
barSizeSetting: str = "1 day"
whatToShow: str = "TRADES"
useRTH: int = 1
class OrderRequest(BaseModel):
contract: ContractRequest
action: str # BUY or SELL
totalQuantity: int
orderType: str = "MKT"
lmtPrice: Optional[float] = None
transmit: bool = True
# ... more models
```
### Deliverables
- ✅ `tws_client.py` with all required methods
- ✅ `models.py` with Pydantic models
- ✅ Error handling and logging implemented
### Acceptance Criteria
- All TWS API operations are wrapped in clean async methods
- Connection management is robust with proper error handling
- Code follows Python best practices and type hints
---
## Phase 3: Implement MCP Server with FastMCP
### Objectives
- Create the MCP server using `FastMCP`
- Define all MCP tools corresponding to TWS API functionalities
- Implement lifespan management for TWS client
- Mount the server on `/api/v1` prefix
### Tasks
#### 3.1 Implement `server.py`
**Key Components:**
- Lifespan context manager for TWS client initialization
- MCP tools decorated with `@mcp.tool()`
- HTTP transport configuration with `/api/v1` prefix
**Example Implementation:**
```python
from mcp.server.fastmcp import FastMCP, Context
from mcp.server.session import ServerSession
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import AsyncIterator
from starlette.applications import Starlette
from starlette.routing import Mount
from .tws_client import TWSClient
from .models import ContractRequest, HistoricalDataRequest, OrderRequest
@dataclass
class AppContext:
"""Application context with TWS client."""
tws: TWSClient
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
"""Manage TWS client lifecycle."""
tws = TWSClient()
# Note: Connection is handled by connect_tws tool, not here
try:
yield AppContext(tws=tws)
finally:
if tws.is_connected():
await tws.disconnect()
# Create MCP server
mcp = FastMCP("IBKR TWS MCP Server", lifespan=app_lifespan)
# Define tools
@mcp.tool()
async def connect_tws(
ctx: Context[ServerSession, AppContext],
host: str = "127.0.0.1",
port: int = 7496,
clientId: int = 1
) -> dict:
"""Connect to TWS/IB Gateway."""
tws = ctx.request_context.lifespan_context.tws
await tws.connect(host, port, clientId)
return {"status": "connected", "host": host, "port": port}
@mcp.tool()
async def get_positions(ctx: Context[ServerSession, AppContext]) -> list:
"""Get current portfolio positions."""
tws = ctx.request_context.lifespan_context.tws
return await tws.get_positions()
@mcp.tool()
async def get_historical_data(
ctx: Context[ServerSession, AppContext],
symbol: str,
secType: str = "STK",
exchange: str = "SMART",
currency: str = "USD",
durationStr: str = "1 Y",
barSizeSetting: str = "1 day",
whatToShow: str = "TRADES"
) -> list:
"""Get historical market data for a contract."""
tws = ctx.request_context.lifespan_context.tws
contract = ContractRequest(
symbol=symbol,
secType=secType,
exchange=exchange,
currency=currency
)
return await tws.get_historical_data(
contract, durationStr, barSizeSetting, whatToShow
)
@mcp.tool()
async def place_order(
ctx: Context[ServerSession, AppContext],
symbol: str,
action: str,
totalQuantity: int,
orderType: str = "MKT",
lmtPrice: float = None,
secType: str = "STK",
exchange: str = "SMART",
currency: str = "USD"
) -> dict:
"""Place an order."""
tws = ctx.request_context.lifespan_context.tws
order_req = OrderRequest(
contract=ContractRequest(
symbol=symbol,
secType=secType,
exchange=exchange,
currency=currency
),
action=action,
totalQuantity=totalQuantity,
orderType=orderType,
lmtPrice=lmtPrice
)
return await tws.place_order(order_req)
# ... implement all other tools
# Mount on /api/v1
app = Starlette(
routes=[
Mount('/api/v1', app=mcp.streamable_http_app()),
]
)
```
#### 3.2 Implement Streaming Tools
For streaming data (market data, account updates), use the MCP SDK's capabilities:
```python
@mcp.tool()
async def stream_market_data(
ctx: Context[ServerSession, AppContext],
symbol: str,
secType: str = "STK"
) -> str:
"""Stream real-time market data (returns subscription ID)."""
tws = ctx.request_context.lifespan_context.tws
# Start streaming and send updates via context
async def send_updates():
async for data in tws.subscribe_market_data(symbol, secType):
await ctx.info(f"Market data: {data}")
# Start background task
asyncio.create_task(send_updates())
return f"Streaming market data for {symbol}"
```
### Deliverables
- ✅ `server.py` with all MCP tools implemented
- ✅ Lifespan management for TWS client
- ✅ HTTP transport mounted on `/api/v1`
### Acceptance Criteria
- All tools are properly decorated and documented
- Tools can access TWS client via context
- Server can be started and responds to requests
---
## Phase 4: Implement Unit Tests
### Objectives
- Write comprehensive unit tests for all components
- Achieve >80% code coverage
- Use mocking to isolate components
### Tasks
#### 4.1 Create Test Fixtures
**`tests/fixtures/sample_positions.json`****:**
```json
[
{
"account": "DU123456",
"contract": {
"symbol": "VTI",
"secType": "STK",
"exchange": "ARCA",
"currency": "USD"
},
"position": 100,
"avgCost": 200.50
},
{
"account": "DU123456",
"contract": {
"symbol": "TLT",
"secType": "STK",
"exchange": "NASDAQ",
"currency": "USD"
},
"position": 50,
"avgCost": 95.30
}
]
```
#### 4.2 Implement Unit Tests
**`tests/unit/test_tws_client.py`****:**
```python
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from src.tws_client import TWSClient
@pytest.mark.asyncio
async def test_connect_success():
"""Test successful connection to TWS."""
client = TWSClient()
with patch.object(client.ib, 'connectAsync', new_callable=AsyncMock) as mock_connect:
mock_connect.return_value = None
result = await client.connect("127.0.0.1", 7496, 1)
assert result is True
assert client.is_connected() is True
mock_connect.assert_called_once_with("127.0.0.1", 7496, clientId=1)
@pytest.mark.asyncio
async def test_get_positions(load_fixture):
"""Test getting positions."""
client = TWSClient()
client._connected = True
# Load fixture data
expected_positions = load_fixture('sample_positions.json')
with patch.object(client.ib, 'positions') as mock_positions:
# Mock the positions response
mock_positions.return_value = [...] # Convert fixture to mock objects
positions = await client.get_positions()
assert len(positions) == 2
assert positions[0]['contract']['symbol'] == 'VTI'
```
**`tests/unit/test_server_tools.py`****:**
```python
import pytest
from unittest.mock import AsyncMock, MagicMock
from src.server import connect_tws, get_positions
@pytest.mark.asyncio
async def test_connect_tws_tool():
"""Test connect_tws MCP tool."""
# Create mock context
mock_ctx = MagicMock()
mock_tws = AsyncMock()
mock_ctx.request_context.lifespan_context.tws = mock_tws
result = await connect_tws(mock_ctx, "127.0.0.1", 7496, 1)
assert result['status'] == 'connected'
mock_tws.connect.assert_called_once_with("127.0.0.1", 7496, 1)
```
#### 4.3 Run Tests and Generate Coverage Report
```bash
# Run all unit tests
uv run pytest tests/unit/ -v
# Run with coverage
uv run pytest tests/unit/ --cov=src --cov-report=html --cov-report=term
```
### Deliverables
- ✅ Unit tests for all TWS client methods
- ✅ Unit tests for all MCP tools
- ✅ Test fixtures for mock data
- ✅ Coverage report >80%
### Acceptance Criteria
- All unit tests pass
- Code coverage is above 80%
- Tests are well-documented and maintainable
---
## Phase 5: Implement Integration Tests (SIT)
### Objectives
- Create end-to-end integration tests
- Test the complete workflow from the e2e case
- Validate with cURL commands
### Tasks
#### 5.1 Create Integration Test Suite
**`tests/integration/test_e2e_workflow.py`****:**
```python
import pytest
import asyncio
from httpx import AsyncClient
BASE_URL = "http://localhost:8000/api/v1"
@pytest.mark.asyncio
async def test_portfolio_rebalancing_workflow():
"""Test the complete portfolio rebalancing workflow."""
async with AsyncClient(base_url=BASE_URL) as client:
# Step 1: Connect to TWS
response = await client.post("/connect_tws", json={
"host": "127.0.0.1",
"port": 7496,
"clientId": 1
})
assert response.status_code == 200
assert response.json()['status'] == 'connected'
# Step 2: Get historical data for optimization
response = await client.post("/get_historical_data", json={
"symbol": "VTI",
"durationStr": "1 Y"
})
assert response.status_code == 200
historical_data = response.json()
assert len(historical_data) > 0
# Step 3: Get current positions
response = await client.get("/get_positions")
assert response.status_code == 200
positions = response.json()
# Step 4: Calculate drift (mock calculation)
# In real scenario, this would involve optimization logic
# Step 5: Place rebalancing orders
response = await client.post("/place_order", json={
"symbol": "VTI",
"action": "SELL",
"totalQuantity": 10,
"orderType": "MKT"
})
assert response.status_code == 200
order_result = response.json()
assert 'orderId' in order_result
# Step 6: Verify execution
await asyncio.sleep(2) # Wait for order to process
response = await client.get("/get_executions")
assert response.status_code == 200
```
#### 5.2 Create cURL Test Scripts
**`scripts/test_curl.sh`****:**
```bash
#!/bin/bash
BASE_URL="http://localhost:8000/api/v1"
echo "=== Test 1: Connect to TWS ==="
curl -X POST $BASE_URL/connect_tws \
-H "Content-Type: application/json" \
-d '{"host": "127.0.0.1", "port": 7496, "clientId": 1}'
echo -e "\n\n=== Test 2: Get Positions ==="
curl $BASE_URL/get_positions
echo -e "\n\n=== Test 3: Get Historical Data ==="
curl -X POST $BASE_URL/get_historical_data \
-H "Content-Type: application/json" \
-d '{
"symbol": "VTI",
"durationStr": "30 D",
"barSizeSetting": "1 day"
}'
echo -e "\n\n=== Test 4: Get Account Summary ==="
curl $BASE_URL/get_account_summary
echo -e "\n\n=== Test 5: Place Market Order ==="
curl -X POST $BASE_URL/place_order \
-H "Content-Type: application/json" \
-d '{
"symbol": "AAPL",
"action": "BUY",
"totalQuantity": 1,
"orderType": "MKT"
}'
echo -e "\n\n=== Test 6: Get Open Orders ==="
curl $BASE_URL/get_open_orders
echo -e "\n\n=== Test 7: Stream Market Data (5 seconds) ==="
timeout 5 curl -N $BASE_URL/stream_market_data?symbol=SPY
echo -e "\n\nAll tests completed!"
```
### Deliverables
- ✅ Integration test suite
- ✅ cURL test scripts
- ✅ Test documentation
### Acceptance Criteria
- Integration tests pass against a paper trading account
- cURL scripts execute successfully
- All e2e workflow steps are validated
---
## Phase 6: Dockerization and Deployment Setup
### Objectives
- Create Docker configuration
- Support both local development and containerized deployment
- Provide clear setup instructions
### Tasks
#### 6.1 Create Dockerfile
```
FROM python:3.11-slim
WORKDIR /app
# Install uv
RUN pip install uv
# Copy project files
COPY pyproject.toml uv.lock ./
COPY src/ ./src/
# Install dependencies
RUN uv sync --frozen
# Expose server port
EXPOSE 8000
# Run the server
CMD ["uv", "run", "uvicorn", "src.server:app", "--host", "0.0.0.0", "--port", "8000"]
```
#### 6.2 Create docker-compose.yml
```yaml
version: '3.8'
services:
mcp-server:
build: .
ports:
- "8000:8000"
environment:
- TWS_HOST=${TWS_HOST:-host.docker.internal}
- TWS_PORT=${TWS_PORT:-7496}
- TWS_CLIENT_ID=${TWS_CLIENT_ID:-1}
extra_hosts:
- "host.docker.internal:host-gateway"
restart: unless-stopped
```
#### 6.3 Create Setup Documentation
**`docs/SETUP.md`****:**
- Prerequisites (TWS/IB Gateway, Python 3.11+, uv)
- Local development setup
- Docker deployment
- Configuration guide
- Troubleshooting
### Deliverables
- ✅ Dockerfile
- ✅ docker-compose.yml
- ✅ Setup documentation
### Acceptance Criteria
- Server runs successfully in Docker
- Local development environment works
- Documentation is clear and complete
---
## Phase 7: Documentation and Delivery
### Objectives
- Complete all documentation
- Create final traceability matrix
- Prepare delivery package
### Tasks
#### 7.1 Complete API Documentation
**`docs/API.md`****:**
- List all MCP tools with descriptions
- Request/response examples
- Error codes and handling
- Usage examples for each tool
#### 7.2 Update Traceability Matrix
Create a comprehensive matrix linking:
- E2E workflow steps
- TWS API functions
- MCP tools
- Unit tests
- Integration tests
#### 7.3 Create README.md
Include:
- Project overview
- Quick start guide
- Architecture diagram
- Links to detailed documentation
- Contributing guidelines
### Deliverables
- ✅ Complete API documentation
- ✅ Updated traceability matrix
- ✅ Comprehensive README
- ✅ All code and tests
### Acceptance Criteria
- All documentation is complete and accurate
- Traceability matrix covers all requirements
- README provides clear getting started guide
---
## Testing Strategy Summary
### Unit Tests
- **Target Coverage:** >80%
- **Focus:** Individual functions and methods
- **Tools:** pytest, pytest-asyncio, pytest-mock
- **Execution:** `uv run pytest tests/unit/`
### Integration Tests
- **Target:** End-to-end workflows
- **Focus:** Complete user scenarios
- **Tools:** pytest, httpx
- **Execution:** `uv run pytest tests/integration/`
### SIT (System Integration Testing)
- **Target:** Real TWS paper account
- **Focus:** Actual API interactions
- **Tools:** cURL, bash scripts
- **Execution:** `./scripts/test_curl.sh`
---
## Timeline Estimate
| Phase | Duration | Dependencies |
| --- | --- | --- |
| Phase 1: Project Setup | 0.5 days | None |
| Phase 2: TWS Client Wrapper | 2 days | Phase 1 |
| Phase 3: MCP Server Implementation | 3 days | Phase 2 |
| Phase 4: Unit Tests | 2 days | Phase 3 |
| Phase 5: Integration Tests | 2 days | Phase 3 |
| Phase 6: Dockerization | 1 day | Phase 3 |
| Phase 7: Documentation | 1.5 days | All phases |
| **Total** | **12 days** | |
---
## Success Criteria
The implementation will be considered complete when:
1. ✅ All MCP tools are implemented and functional
1. ✅ Unit test coverage is >80%
1. ✅ All integration tests pass
1. ✅ cURL test scripts execute successfully against paper account
1. ✅ Docker deployment works
1. ✅ Documentation is complete and accurate
1. ✅ Traceability matrix is comprehensive
1. ✅ E2E portfolio rebalancing workflow can be executed end-to-end
---
## Risk Mitigation
### Risk 1: TWS API Connection Issues
- **Mitigation:** Implement robust retry logic and connection monitoring
- **Fallback:** Provide detailed error messages and troubleshooting guide
### Risk 2: MCP SDK Compatibility
- **Mitigation:** Use latest stable version, follow official examples
- **Fallback:** Engage with MCP community for support
### Risk 3: Testing Environment Setup
- **Mitigation:** Provide Docker-based testing environment
- **Fallback:** Detailed setup guide for local TWS paper account
---
## Next Steps
1. Review and approve this plan
1. Set up development environment
1. Begin Phase 1 implementation
1. Schedule regular progress reviews
1. Iterate based on feedback and testing results