"""Tests for WebSocket server and client."""
import asyncio
import json
import pytest
from pathlib import Path
import sys
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from amicus.websocket_server import SynapseWebSocketServer
from amicus.websocket_client import SynapseWebSocketClient
@pytest.mark.asyncio
async def test_server_startup():
"""Test that server starts successfully"""
server = SynapseWebSocketServer(host="127.0.0.1", port=8766)
# Start server in background
server_task = asyncio.create_task(server.start())
# Give it time to start
await asyncio.sleep(0.5)
# Verify server is running
assert len(server.clients) == 0
assert server.message_count == 0
# Cleanup
server_task.cancel()
try:
await server_task
except asyncio.CancelledError:
pass
@pytest.mark.asyncio
async def test_client_connection():
"""Test client can connect and register"""
server = SynapseWebSocketServer(host="127.0.0.1", port=8767)
client = SynapseWebSocketClient(
node_id="test-node",
server_url="ws://127.0.0.1:8767"
)
# Start server
server_task = asyncio.create_task(server.start())
await asyncio.sleep(0.5)
# Start client connection
client_task = asyncio.create_task(client.connect())
# Wait for connection
for _ in range(10):
if client.connected:
break
await asyncio.sleep(0.1)
assert client.connected
assert len(server.clients) == 1
assert "test-node" in server.node_lookup
# Cleanup
await client.disconnect()
await asyncio.sleep(0.1)
assert len(server.clients) == 0
server_task.cancel()
client_task.cancel()
try:
await server_task
except asyncio.CancelledError:
pass
@pytest.mark.asyncio
async def test_pub_sub():
"""Test publish/subscribe functionality"""
server = SynapseWebSocketServer(host="127.0.0.1", port=8768)
client1 = SynapseWebSocketClient("node-1", "ws://127.0.0.1:8768")
client2 = SynapseWebSocketClient("node-2", "ws://127.0.0.1:8768")
# Start server
server_task = asyncio.create_task(server.start())
await asyncio.sleep(0.5)
# Start clients
client1_task = asyncio.create_task(client1.connect())
client2_task = asyncio.create_task(client2.connect())
# Wait for connections
for _ in range(10):
if client1.connected and client2.connected:
break
await asyncio.sleep(0.1)
assert client1.connected and client2.connected
# Setup event handler for client2
received_events = []
async def task_handler(topic: str, data: dict):
received_events.append((topic, data))
# Subscribe client2 to 'tasks' topic
await client2.subscribe('tasks', task_handler)
await asyncio.sleep(0.2)
# Client1 publishes to 'tasks'
await client1.publish('tasks', {'action': 'created', 'id': 'task-123'})
await asyncio.sleep(0.2)
# Verify client2 received the event
assert len(received_events) == 1
assert received_events[0][0] == 'tasks'
assert received_events[0][1]['action'] == 'created'
assert received_events[0][1]['id'] == 'task-123'
# Cleanup
await client1.disconnect()
await client2.disconnect()
server_task.cancel()
client1_task.cancel()
client2_task.cancel()
@pytest.mark.asyncio
async def test_query():
"""Test query functionality"""
server = SynapseWebSocketServer(host="127.0.0.1", port=8769)
client = SynapseWebSocketClient("test-node", "ws://127.0.0.1:8769")
# Start server and client
server_task = asyncio.create_task(server.start())
await asyncio.sleep(0.5)
client_task = asyncio.create_task(client.connect())
# Wait for connection
for _ in range(10):
if client.connected:
break
await asyncio.sleep(0.1)
assert client.connected
# Query for node list
nodes = await client.query('nodes')
assert isinstance(nodes, list)
assert len(nodes) == 1
assert nodes[0]['node_id'] == 'test-node'
# Query for stats
stats = await client.query('stats')
assert isinstance(stats, dict)
assert 'uptime' in stats
assert 'client_count' in stats
assert stats['client_count'] == 1
# Cleanup
await client.disconnect()
server_task.cancel()
client_task.cancel()
@pytest.mark.asyncio
async def test_reconnection():
"""Test automatic reconnection"""
server = SynapseWebSocketServer(host="127.0.0.1", port=8770)
client = SynapseWebSocketClient(
"test-node",
"ws://127.0.0.1:8770",
reconnect_interval=1.0
)
# Start server and client
server_task = asyncio.create_task(server.start())
await asyncio.sleep(0.5)
client_task = asyncio.create_task(client.connect())
# Wait for initial connection
for _ in range(10):
if client.connected:
break
await asyncio.sleep(0.1)
assert client.connected
initial_connections = client.message_count
# Simulate server restart by canceling and restarting
server_task.cancel()
try:
await server_task
except asyncio.CancelledError:
pass
await asyncio.sleep(0.5)
# Client should detect disconnection
assert not client.connected
# Restart server
server = SynapseWebSocketServer(host="127.0.0.1", port=8770)
server_task = asyncio.create_task(server.start())
# Wait for reconnection (up to 3 seconds)
for _ in range(30):
if client.connected:
break
await asyncio.sleep(0.1)
assert client.connected
assert len(server.clients) == 1
# Cleanup
await client.disconnect()
server_task.cancel()
client_task.cancel()
if __name__ == '__main__':
pytest.main([__file__, '-v'])