Skip to main content
Glama
knishioka

IB Analytics MCP Server

by knishioka
client.py14.5 kB
"""Interactive Brokers Flex Query API client with multi-account support""" import asyncio import time from datetime import date, datetime import defusedxml.ElementTree as ET # noqa: N817 import httpx import requests from pydantic import ValidationError from ib_sec_mcp.api.models import APICredentials, FlexStatement class FlexQueryError(Exception): """Base exception for Flex Query errors""" pass class FlexQueryAPIError(FlexQueryError): """API request/response errors""" pass class FlexQueryValidationError(FlexQueryError): """Data validation errors""" pass class FlexQueryClient: """ Interactive Brokers Flex Query API client (Version 3) Supports both single and multi-account data fetching with async capabilities. Example: # Single account client = FlexQueryClient(query_id="123", token="abc") data = client.fetch_statement(start_date=date(2025, 1, 1)) # Multiple accounts client = FlexQueryClient(credentials=[cred1, cred2]) results = await client.fetch_all_statements_async(start_date=date(2025, 1, 1)) """ BASE_URL_SEND = ( "https://ndcdyn.interactivebrokers.com/AccountManagement/FlexWebService/SendRequest" ) BASE_URL_GET = ( "https://gdcdyn.interactivebrokers.com/AccountManagement/FlexWebService/GetStatement" ) API_VERSION = "3" USER_AGENT = "ib-analytics/0.1.0" def __init__( self, query_id: str | None = None, token: str | None = None, credentials: list[APICredentials] | None = None, timeout: int = 30, max_retries: int = 3, retry_delay: int = 5, ): """ Initialize Flex Query client Args: query_id: Single account query ID token: Single account token credentials: List of credentials for multi-account support timeout: Request timeout in seconds max_retries: Maximum retry attempts retry_delay: Delay between retries in seconds """ if credentials: self.credentials = credentials elif query_id and token: self.credentials = [APICredentials(query_id=query_id, token=token)] else: raise ValueError("Either (query_id, token) or credentials must be provided") self.timeout = timeout self.max_retries = max_retries self.retry_delay = retry_delay def fetch_statement( self, start_date: date | None = None, end_date: date | None = None, credential_index: int = 0, ) -> FlexStatement: """ Fetch statement for a single account (synchronous) Args: start_date: Statement start date (uses query default if None) end_date: Statement end date (uses query default if None) credential_index: Index of credentials to use (for multi-account) Returns: FlexStatement with raw data Raises: FlexQueryAPIError: If API request fails FlexQueryValidationError: If response validation fails """ if credential_index >= len(self.credentials): raise ValueError(f"Invalid credential index: {credential_index}") cred = self.credentials[credential_index] # Step 1: Send request reference_code = self._send_request(cred, start_date, end_date) # Step 2: Poll for statement (with retries) for attempt in range(self.max_retries): time.sleep(self.retry_delay) try: statement = self._get_statement(cred, reference_code, start_date, end_date) return statement except FlexQueryAPIError as e: if "not yet ready" in str(e).lower() and attempt < self.max_retries - 1: continue raise raise FlexQueryAPIError(f"Statement not ready after {self.max_retries} attempts") def _send_request( self, cred: APICredentials, start_date: date | None, end_date: date | None, ) -> str: """Send request to generate statement""" params: dict[str, str] = { "t": cred.token, "q": cred.query_id, "v": self.API_VERSION, } headers = {"User-Agent": self.USER_AGENT} try: response = requests.get( self.BASE_URL_SEND, params=params, headers=headers, timeout=self.timeout, ) response.raise_for_status() except requests.RequestException as e: raise FlexQueryAPIError(f"SendRequest failed: {e}") from e # Parse XML response try: root = ET.fromstring(response.text) status_elem = root.find(".//Status") reference_code_elem = root.find(".//ReferenceCode") error_code_elem = root.find(".//ErrorCode") error_msg_elem = root.find(".//ErrorMessage") if status_elem is None: raise FlexQueryAPIError("Invalid response: missing Status element") status = status_elem.text reference_code = reference_code_elem.text if reference_code_elem is not None else None error_code = error_code_elem.text if error_code_elem is not None else None error_msg = error_msg_elem.text if error_msg_elem is not None else None if status != "Success": raise FlexQueryAPIError(f"SendRequest failed: {error_code} - {error_msg}") if not reference_code: raise FlexQueryAPIError("No reference code in response") return str(reference_code) except ET.ParseError as e: raise FlexQueryAPIError(f"Failed to parse XML response: {e}") from e def _get_statement( self, cred: APICredentials, reference_code: str, start_date: date | None, end_date: date | None, ) -> FlexStatement: """Get statement using reference code""" params: dict[str, str] = { "t": cred.token, "q": reference_code, "v": self.API_VERSION, } headers = {"User-Agent": self.USER_AGENT} try: response = requests.get( self.BASE_URL_GET, params=params, headers=headers, timeout=self.timeout, ) response.raise_for_status() except requests.RequestException as e: raise FlexQueryAPIError(f"GetStatement failed: {e}") from e # Check if statement is ready if "Statement generation in progress" in response.text: raise FlexQueryAPIError("Statement not yet ready") # Parse response to extract metadata account_id = self._extract_account_id(response.text) generated_time = datetime.now() # Use provided dates or extract from response from_date = start_date or date.today() to_date = end_date or date.today() try: return FlexStatement( query_id=cred.query_id, account_id=account_id, from_date=from_date, to_date=to_date, when_generated=generated_time, raw_data=response.text, ) except ValidationError as e: raise FlexQueryValidationError(f"Statement validation failed: {e}") from e def _extract_account_id(self, data: str) -> str: """Extract account ID from CSV/XML data""" # Try CSV format first lines = data.strip().split("\n") for line in lines: if "ClientAccountID" in line and "," in line: # Next line should have the account ID parts = line.split(",") if len(parts) > 0: # Look for account ID pattern (U followed by digits) for part in parts: if part.startswith("U") and part[1:].isdigit(): return part # Try XML format try: root = ET.fromstring(data) account_elem = root.find(".//*[@accountId]") if account_elem is not None: account_id = account_elem.get("accountId") if account_id: return str(account_id) except ET.ParseError: pass # Fallback: return query ID return "UNKNOWN" async def fetch_statement_async( self, start_date: date | None = None, end_date: date | None = None, credential_index: int = 0, ) -> FlexStatement: """Async version of fetch_statement""" if credential_index >= len(self.credentials): raise ValueError(f"Invalid credential index: {credential_index}") cred = self.credentials[credential_index] # Step 1: Send request reference_code = await self._send_request_async(cred, start_date, end_date) # Step 2: Poll for statement for attempt in range(self.max_retries): await asyncio.sleep(self.retry_delay) try: statement = await self._get_statement_async( cred, reference_code, start_date, end_date ) return statement except FlexQueryAPIError as e: if "not yet ready" in str(e).lower() and attempt < self.max_retries - 1: continue raise raise FlexQueryAPIError(f"Statement not ready after {self.max_retries} attempts") async def _send_request_async( self, cred: APICredentials, start_date: date | None, end_date: date | None, ) -> str: """Async version of _send_request""" params: dict[str, str] = { "t": cred.token, "q": cred.query_id, "v": self.API_VERSION, } headers = {"User-Agent": self.USER_AGENT} async with httpx.AsyncClient(timeout=self.timeout) as client: try: response = await client.get(self.BASE_URL_SEND, params=params, headers=headers) response.raise_for_status() except httpx.HTTPError as e: raise FlexQueryAPIError(f"SendRequest failed: {e}") from e # Parse XML (same logic as sync version) try: root = ET.fromstring(response.text) status_elem = root.find(".//Status") reference_code_elem = root.find(".//ReferenceCode") error_code_elem = root.find(".//ErrorCode") error_msg_elem = root.find(".//ErrorMessage") if status_elem is None: raise FlexQueryAPIError("Invalid response: missing Status element") status = status_elem.text reference_code = ( reference_code_elem.text if reference_code_elem is not None else None ) error_code = error_code_elem.text if error_code_elem is not None else None error_msg = error_msg_elem.text if error_msg_elem is not None else None if status != "Success": raise FlexQueryAPIError(f"SendRequest failed: {error_code} - {error_msg}") if not reference_code: raise FlexQueryAPIError("No reference code in response") return str(reference_code) except ET.ParseError as e: raise FlexQueryAPIError(f"Failed to parse XML response: {e}") from e async def _get_statement_async( self, cred: APICredentials, reference_code: str, start_date: date | None, end_date: date | None, ) -> FlexStatement: """Async version of _get_statement""" params: dict[str, str] = { "t": cred.token, "q": reference_code, "v": self.API_VERSION, } headers = {"User-Agent": self.USER_AGENT} async with httpx.AsyncClient(timeout=self.timeout) as client: try: response = await client.get(self.BASE_URL_GET, params=params, headers=headers) response.raise_for_status() except httpx.HTTPError as e: raise FlexQueryAPIError(f"GetStatement failed: {e}") from e if "Statement generation in progress" in response.text: raise FlexQueryAPIError("Statement not yet ready") account_id = self._extract_account_id(response.text) generated_time = datetime.now() from_date = start_date or date.today() to_date = end_date or date.today() try: return FlexStatement( query_id=cred.query_id, account_id=account_id, from_date=from_date, to_date=to_date, when_generated=generated_time, raw_data=response.text, ) except ValidationError as e: raise FlexQueryValidationError(f"Statement validation failed: {e}") from e async def fetch_all_statements_async( self, start_date: date | None = None, end_date: date | None = None, ) -> list[FlexStatement]: """ Fetch statements for all configured accounts in parallel Args: start_date: Statement start date end_date: Statement end date Returns: List of FlexStatements for all accounts """ tasks = [ self.fetch_statement_async(start_date, end_date, i) for i in range(len(self.credentials)) ] return await asyncio.gather(*tasks) def fetch_all_statements( self, start_date: date | None = None, end_date: date | None = None, ) -> list[FlexStatement]: """ Fetch statements for all configured accounts (synchronous) Args: start_date: Statement start date end_date: Statement end date Returns: List of FlexStatements for all accounts """ return [self.fetch_statement(start_date, end_date, i) for i in range(len(self.credentials))]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/knishioka/ib-sec-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server