Skip to main content
Glama
test_realtime_jobs.py2.72 kB
"""Integration-style tests for the realtime job WebSocket client.""" import asyncio import json import pytest import pytest_asyncio from aiohttp import web from neem.mcp.jobs.realtime import RealtimeJobClient @pytest_asyncio.fixture async def websocket_server(unused_tcp_port): """Spin up a temporary aiohttp WebSocket endpoint for testing.""" async def handler(request): ws = web.WebSocketResponse() await ws.prepare(request) # Basic auth check so we know headers make it through. assert request.headers.get("Authorization") == "Bearer test-token" assert request.headers.get("X-User-ID") == "dev-user" assert request.headers.get("Sec-WebSocket-Protocol") == "Bearer.dev-user" # Simulate backend: immediately send events for job-123 # (mimics the user channel broadcasting all job events) await asyncio.sleep(0.1) # Small delay to let client get ready await ws.send_json({"job_id": "job-123", "type": "progress", "payload": {"step": 0}}) await ws.send_json({"job_id": "job-123", "type": "completed", "payload": {"result": "ok"}}) # Keep connection open until client closes async for msg in ws: pass await ws.close() return ws app = web.Application() app.router.add_get("/ws", handler) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, "127.0.0.1", unused_tcp_port) await site.start() yield f"ws://127.0.0.1:{unused_tcp_port}/ws" await runner.cleanup() @pytest.mark.asyncio async def test_realtime_client_receives_events(websocket_server): """Test that events are cached and can be retrieved.""" client = RealtimeJobClient( websocket_url=websocket_server, token_provider=lambda: "test-token", dev_user_id="dev-user", heartbeat=2.0, ) try: # Ensure client connects (triggers background connection) await client._ensure_connected() # Wait for job to complete (events pushed to cache) await asyncio.wait_for(client.wait_for_terminal("job-123", timeout=5.0), timeout=5.0) # Verify job is marked as complete assert client.is_job_complete("job-123") # Get all events from cache events = await client.get_events("job-123") assert len(events) == 2 # Check first event assert events[0].event_type == "progress" assert events[0].job_id == "job-123" assert events[0].payload["step"] == 0 # Check second event assert events[1].event_type == "completed" assert events[1].payload["result"] == "ok" finally: await client.close()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sophia-labs/mnemosyne-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server