Skip to main content
Glama
test_news_streaming.py2.01 kB
#!/usr/bin/env python3 """ Test news bulletins streaming with the MCP server. """ import requests import json import time BASE_URL = "http://localhost:8000/api/v1/mcp" def make_request(method, params=None): """Make a JSON-RPC request to the MCP server.""" payload = { "jsonrpc": "2.0", "method": method, "params": params or {}, "id": 1 } response = requests.post(BASE_URL, json=payload) return response.json() def main(): print("=== Testing News Bulletins Streaming ===\n") # Step 1: Connect to TWS print("1. Connecting to TWS...") result = make_request("tools/call", { "name": "ibkr_connect", "arguments": { "host": "127.0.0.1", "port": 7497, "clientId": 1 } }) print(f" Connection result: {json.dumps(result, indent=2)}\n") # Step 2: Start news resource print("2. Starting news bulletins resource...") result = make_request("tools/call", { "name": "ibkr_start_news_resource", "arguments": { "allMessages": True } }) print(f" Start result: {json.dumps(result, indent=2)}\n") # Step 3: Wait for news bulletins to arrive print("3. Waiting for news bulletins (30 seconds)...") print(" (Check TWS Station News tab to see if news is flowing)") time.sleep(30) # Step 4: Read the news resource print("\n4. Reading news bulletins resource...") result = make_request("resources/read", { "uri": "ibkr://news-bulletins" }) print(f" News bulletins: {json.dumps(result, indent=2)}\n") # Step 5: List active streams print("5. Listing active resource streams...") result = make_request("tools/call", { "name": "ibkr_list_active_resource_streams", "arguments": {} }) print(f" Active streams: {json.dumps(result, indent=2)}\n") print("=== Test Complete ===") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/haymant/tws-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server