We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/up2itnow/ReadyTrader-Stocks'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
test_stream_lifecycle.py•936 B
import time
def test_ws_stream_restart_after_stop():
"""
Regression test:
Streams must be restartable after stop() (the stop flag must be cleared on start()).
"""
# Import locally so the test doesn't accidentally import network code paths elsewhere.
from marketdata.ws_streams import _WsStream # type: ignore
class DummyStream(_WsStream):
async def _run_async(self) -> None: # pragma: no cover
# keep running until stop is requested
import asyncio
while not self._stop.is_set():
await asyncio.sleep(0.01)
s = DummyStream()
s.start()
time.sleep(0.05)
assert s.status()["running"] is True
s.stop()
time.sleep(0.02)
assert s.status()["running"] is False
# restart should work
s.start()
time.sleep(0.05)
assert s.status()["running"] is True
s.stop()
assert s.status()["running"] is False