Skip to main content
Glama
test_streaming_event_loop_fix.py7.47 kB
#!/usr/bin/env python3 """ Test script to verify streaming tools event loop fix. This script tests: 1. ibkr_stream_market_data completes within 10 seconds 2. ibkr_stream_account_updates doesn't throw event loop error 3. Session remains stable after streaming tools complete """ import asyncio import time from src.tws_client import TWSClient from src.models import ContractRequest async def test_stream_market_data(): """Test that stream_market_data uses non-blocking callback approach.""" print("=" * 80) print("TEST 1: stream_market_data non-blocking callback") print("=" * 80) client = TWSClient() try: # Connect print("Connecting to TWS...") await client.connect() print(f"Connected: {client.is_connected()}") # Test market data streaming req = ContractRequest(symbol="AAPL", secType="STK", exchange="SMART", currency="USD") print(f"\nStreaming market data for AAPL (10 seconds)...") start = time.monotonic() update_count = 0 async for update in client.stream_market_data(req): elapsed = time.monotonic() - start if elapsed >= 10: print(f"Duration limit reached: {elapsed:.1f}s") break if update: # Non-empty update update_count += 1 print(f" [{elapsed:.1f}s] Update {update_count}: {update}") # Check if we're over time if elapsed > 12: print("WARNING: Exceeded 12 seconds, breaking") break elapsed = time.monotonic() - start print(f"\nCompleted in {elapsed:.1f} seconds") print(f"Total updates: {update_count}") if elapsed <= 12: print("✅ PASS: Completed within time limit") else: print("❌ FAIL: Took too long") except Exception as e: print(f"❌ FAIL: {type(e).__name__}: {e}") import traceback traceback.print_exc() finally: if client.is_connected(): await client.disconnect() print("Disconnected") async def test_stream_account_updates(): """Test that stream_account_updates doesn't block event loop.""" print("\n" + "=" * 80) print("TEST 2: stream_account_updates event loop fix") print("=" * 80) client = TWSClient() try: # Connect print("Connecting to TWS...") await client.connect() print(f"Connected: {client.is_connected()}") # Get account account_summary = await client.get_account_summary() if not account_summary: print("No accounts found, skipping test") return account = account_summary[0].get("account", "") print(f"Using account: {account}") # Test account updates streaming print(f"\nStreaming account updates (10 seconds)...") start = time.monotonic() update_count = 0 async for update in client.stream_account_updates(account): elapsed = time.monotonic() - start if elapsed >= 10: print(f"Duration limit reached: {elapsed:.1f}s") break if update: # Non-empty update update_count += 1 update_type = update.get("type", "unknown") print(f" [{elapsed:.1f}s] Update {update_count}: type={update_type}") # Check if we're over time if elapsed > 12: print("WARNING: Exceeded 12 seconds, breaking") break elapsed = time.monotonic() - start print(f"\nCompleted in {elapsed:.1f} seconds") print(f"Total updates: {update_count}") if elapsed <= 12: print("✅ PASS: Completed within time limit, no event loop error") else: print("❌ FAIL: Took too long") except RuntimeError as e: if "event loop is already running" in str(e): print(f"❌ FAIL: Event loop error: {e}") else: print(f"❌ FAIL: {e}") except Exception as e: print(f"❌ FAIL: {type(e).__name__}: {e}") import traceback traceback.print_exc() finally: if client.is_connected(): await client.disconnect() print("Disconnected") async def test_session_stability(): """Test that session remains stable after streaming.""" print("\n" + "=" * 80) print("TEST 3: Session stability after streaming") print("=" * 80) client = TWSClient() try: # Connect print("Connecting to TWS...") await client.connect() print(f"Connected: {client.is_connected()}") # 1. Stream market data req = ContractRequest(symbol="AAPL", secType="STK", exchange="SMART", currency="USD") print("\n1. Streaming market data for 5 seconds...") start = time.monotonic() async for update in client.stream_market_data(req): if (time.monotonic() - start) >= 5: break print(f" Completed in {time.monotonic() - start:.1f}s") # 2. Test regular tool immediately after print("\n2. Testing regular tool (get_positions)...") positions = await client.get_positions() print(f" Got {len(positions)} positions") # 3. Stream again print("\n3. Streaming market data again for 5 seconds...") start = time.monotonic() async for update in client.stream_market_data(req): if (time.monotonic() - start) >= 5: break print(f" Completed in {time.monotonic() - start:.1f}s") # 4. Test regular tool again print("\n4. Testing regular tool again (get_account_summary)...") account_summary = await client.get_account_summary() print(f" Got {len(account_summary)} accounts") print("\n✅ PASS: Session remained stable through multiple streaming calls") except Exception as e: print(f"❌ FAIL: {type(e).__name__}: {e}") import traceback traceback.print_exc() finally: if client.is_connected(): await client.disconnect() print("Disconnected") async def main(): """Run all tests.""" print("\n" + "=" * 80) print("STREAMING TOOLS EVENT LOOP FIX - TEST SUITE") print("=" * 80) print("\nThis test suite verifies:") print("1. Market data streaming completes within time limit") print("2. Account updates streaming doesn't block event loop") print("3. Session remains stable after streaming operations") print("\n" + "=" * 80 + "\n") # Run tests sequentially await test_stream_market_data() await asyncio.sleep(2) # Brief pause between tests await test_stream_account_updates() await asyncio.sleep(2) await test_session_stability() print("\n" + "=" * 80) print("TEST SUITE COMPLETE") print("=" * 80) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n\nTest interrupted by user") except Exception as e: print(f"\nFatal error: {e}") import traceback traceback.print_exc()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/haymant/tws-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server