#!/usr/bin/env python3
"""
WebSocket client for Synapse nodes.
Connects to the WebSocket server for real-time communication.
Implements Phase 4 from docs/REALTIME_COMMUNICATION_RESEARCH.md
"""
import asyncio
import json
import time
from typing import Optional, Callable, Dict, Any
import websockets
from websockets.client import WebSocketClientProtocol
class SynapseWebSocketClient:
"""
WebSocket client for Synapse nodes.
Connects to central WebSocket server for real-time pub/sub messaging.
"""
def __init__(
self,
node_id: str,
server_url: str = "ws://localhost:8765",
reconnect_interval: float = 5.0
):
self.node_id = node_id
self.server_url = server_url
self.reconnect_interval = reconnect_interval
# Connection state
self.websocket: Optional[WebSocketClientProtocol] = None
self.connected = False
self.running = False
# Message handlers
self.event_handlers: Dict[str, Callable] = {}
self.response_handlers: Dict[str, Callable] = {}
# Subscriptions
self.subscriptions: set = set()
# Statistics
self.message_count = 0
self.last_heartbeat = 0
async def connect(self):
"""Connect to WebSocket server with automatic reconnection"""
self.running = True
while self.running:
try:
async with websockets.connect(
self.server_url,
ping_interval=30,
ping_timeout=10
) as websocket:
self.websocket = websocket
self.connected = True
print(f"[{self.node_id}] Connected to {self.server_url}")
# Register with server
await self._register()
# Resubscribe to topics
await self._resubscribe()
# Start heartbeat task
heartbeat_task = asyncio.create_task(self._heartbeat_loop())
# Message receive loop
try:
async for message in websocket:
await self._handle_message(message)
except websockets.exceptions.ConnectionClosed:
print(f"[{self.node_id}] Connection closed")
finally:
heartbeat_task.cancel()
self.connected = False
except Exception as e:
print(f"[{self.node_id}] Connection error: {e}")
self.connected = False
# Reconnect after interval
if self.running:
print(f"[{self.node_id}] Reconnecting in {self.reconnect_interval}s...")
await asyncio.sleep(self.reconnect_interval)
async def disconnect(self):
"""Disconnect from server"""
self.running = False
if self.websocket:
await self.websocket.close()
self.connected = False
async def _register(self):
"""Send registration message to server"""
await self.websocket.send(json.dumps({
'type': 'register',
'node_id': self.node_id
}))
# Wait for acknowledgment
response = await asyncio.wait_for(
self.websocket.recv(),
timeout=5.0
)
msg = json.loads(response)
if msg.get('type') == 'registered':
print(f"[{self.node_id}] Registered with server")
else:
raise Exception(f"Registration failed: {msg}")
async def _resubscribe(self):
"""Resubscribe to topics after reconnection"""
for topic in self.subscriptions:
await self.websocket.send(json.dumps({
'type': 'subscribe',
'topic': topic
}))
async def _heartbeat_loop(self):
"""Send periodic heartbeat to server"""
while self.connected:
try:
await self.websocket.send(json.dumps({
'type': 'heartbeat'
}))
self.last_heartbeat = time.time()
await asyncio.sleep(25) # Send every 25 seconds
except Exception as e:
print(f"[{self.node_id}] Heartbeat error: {e}")
break
async def _handle_message(self, message: str):
"""Handle incoming message from server"""
try:
msg = json.loads(message)
msg_type = msg.get('type')
if msg_type == 'event':
# Handle pub/sub event
topic = msg.get('topic')
data = msg.get('data', {})
if topic in self.event_handlers:
handler = self.event_handlers[topic]
asyncio.create_task(handler(topic, data))
elif msg_type == 'query_response':
# Handle query response
request_id = msg.get('request_id')
data = msg.get('data', {})
if request_id in self.response_handlers:
handler = self.response_handlers.pop(request_id)
asyncio.create_task(handler(data))
elif msg_type == 'heartbeat_ack':
# Heartbeat acknowledged
pass
elif msg_type in ('subscribed', 'unsubscribed'):
# Subscription confirmed
pass
else:
print(f"[{self.node_id}] Unknown message type: {msg_type}")
self.message_count += 1
except json.JSONDecodeError:
print(f"[{self.node_id}] Invalid JSON received")
except Exception as e:
print(f"[{self.node_id}] Error handling message: {e}")
async def subscribe(self, topic: str, handler: Callable):
"""
Subscribe to a topic and register event handler.
Args:
topic: Topic to subscribe to
handler: Async function to call when events arrive
"""
self.subscriptions.add(topic)
self.event_handlers[topic] = handler
if self.connected:
await self.websocket.send(json.dumps({
'type': 'subscribe',
'topic': topic
}))
async def unsubscribe(self, topic: str):
"""
Unsubscribe from a topic.
Args:
topic: Topic to unsubscribe from
"""
self.subscriptions.discard(topic)
if topic in self.event_handlers:
del self.event_handlers[topic]
if self.connected:
await self.websocket.send(json.dumps({
'type': 'unsubscribe',
'topic': topic
}))
async def publish(self, topic: str, data: Dict[str, Any]):
"""
Publish an event to a topic.
Args:
topic: Topic to publish to
data: Event data (must be JSON-serializable)
"""
if not self.connected:
raise Exception("Not connected to server")
await self.websocket.send(json.dumps({
'type': 'publish',
'topic': topic,
'data': data
}))
async def query(self, query_type: str) -> Dict[str, Any]:
"""
Query server for information.
Args:
query_type: Type of query ('nodes', 'stats', etc.)
Returns:
Query response data
"""
if not self.connected:
raise Exception("Not connected to server")
request_id = f"{self.node_id}-{time.time()}"
# Create future for response
future = asyncio.Future()
async def response_handler(data):
future.set_result(data)
self.response_handlers[request_id] = response_handler
# Send query
await self.websocket.send(json.dumps({
'type': 'query',
'query': query_type,
'request_id': request_id
}))
# Wait for response (with timeout)
try:
result = await asyncio.wait_for(future, timeout=10.0)
return result
except asyncio.TimeoutError:
if request_id in self.response_handlers:
del self.response_handlers[request_id]
raise Exception("Query timeout")
# Example usage
async def example():
"""Example usage of WebSocket client"""
client = SynapseWebSocketClient(
node_id="example-node",
server_url="ws://localhost:8765"
)
# Define event handler
async def task_handler(topic: str, data: dict):
print(f"Task event received: {data}")
# Start connection (in background)
connect_task = asyncio.create_task(client.connect())
# Wait for connection
while not client.connected:
await asyncio.sleep(0.1)
# Subscribe to task events
await client.subscribe('tasks', task_handler)
# Publish an event
await client.publish('tasks', {'action': 'claimed', 'task_id': '123'})
# Query server stats
stats = await client.query('stats')
print(f"Server stats: {stats}")
# Run for a bit
await asyncio.sleep(10)
# Disconnect
await client.disconnect()
await connect_task
if __name__ == '__main__':
asyncio.run(example())