# WebSocket Usage Guide
Guide for using the WebSocket real-time communication system (Issue #20 Phase 4).
## Overview
The WebSocket system enables real-time communication between Amicus nodes, supporting:
- **Pub/sub messaging**: Nodes can publish and subscribe to events
- **Multi-host deployments**: Nodes can run on different machines
- **Auto-reconnection**: Clients automatically reconnect if connection drops
- **Authentication**: JWT-based security (optional)
## Quick Start
### 1. Start WebSocket Server
```bash
# Start server on default port (8765)
python -m amicus.websocket_server
# Or specify host/port
python -m amicus.websocket_server --host 0.0.0.0 --port 9000
```
Or via CLI (if integrated):
```bash
amicus-mcp --ws-server --ws-port 8765
```
### 2. Connect Nodes
```python
from amicus.websocket_client import SynapseWebSocketClient
# Create client
client = SynapseWebSocketClient("node-1", "ws://localhost:8765")
# Connect
await client.connect()
# Subscribe to events
def on_task_update(data):
print(f"Task update: {data}")
await client.subscribe("task.update", on_task_update)
# Publish events
await client.publish("task.claimed", {
"task_id": "task-123",
"claimed_by": "node-1"
})
# Disconnect when done
await client.disconnect()
```
## Event Types
Standard event topics:
| Topic | Description | Data Fields |
|-------|-------------|-------------|
| `state.update` | State file changed | `node_id`, `status`, ... |
| `task.new` | New task available | `task_id`, `description`, `priority` |
| `task.claimed` | Task was claimed | `task_id`, `claimed_by` |
| `task.completed` | Task finished | `task_id`, `outcome`, `duration` |
| `node.registered` | Node joined cluster | `node_id`, `role`, `model` |
| `node.terminated` | Node left cluster | `node_id`, `reason` |
### Custom Topics
You can use any topic name:
```python
await client.subscribe("my.custom.event", handler)
await client.publish("my.custom.event", {"data": "value"})
```
## Demo Script
Run the included demo:
```bash
python examples/websocket_demo.py
```
This starts:
- 1 WebSocket server
- 1 Bootstrap manager node
- 2 Worker nodes
Shows real-time pub/sub messaging between nodes.
## Production Deployment
### With systemd
Create `/etc/systemd/system/amicus-websocket.service`:
```ini
[Unit]
Description=Amicus WebSocket Server
After=network.target
[Service]
Type=simple
User=amicus
WorkingDirectory=/opt/amicus
ExecStart=/usr/bin/python3 -m amicus.websocket_server --host 0.0.0.0 --port 8765
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
```
Enable and start:
```bash
sudo systemctl enable amicus-websocket
sudo systemctl start amicus-websocket
sudo systemctl status amicus-websocket
```
### With Docker
```dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY src/ ./src/
EXPOSE 8765
CMD ["python", "-m", "amicus.websocket_server", "--host", "0.0.0.0", "--port", "8765"]
```
Build and run:
```bash
docker build -t amicus-websocket .
docker run -d -p 8765:8765 --name amicus-ws amicus-websocket
```
## Security
### Enable TLS/SSL
```python
from amicus.websocket_ssl import setup_ssl_context
ssl_context = setup_ssl_context(
certfile="cert.pem",
keyfile="key.pem"
)
server = SynapseWebSocketServer(
host="0.0.0.0",
port=8765,
ssl_context=ssl_context
)
```
Client connection:
```python
# Use wss:// for secure connection
client = SynapseWebSocketClient("node-1", "wss://server.example.com:8765")
```
### Enable JWT Authentication
```python
from amicus.websocket_auth import JWTAuthenticator
auth = JWTAuthenticator(secret_key="your-secret-key")
# Generate token for client
token = auth.generate_token("node-1", role="worker")
# Client uses token
client = SynapseWebSocketClient(
node_id="node-1",
server_url="ws://localhost:8765",
auth_token=token
)
```
## Monitoring
### Query Server Stats
```python
stats = await client.query_stats()
print(stats)
# {
# "connected_clients": 5,
# "subscriptions": 12,
# "messages_sent": 1543,
# "uptime_seconds": 3600,
# "clients": [
# {"node_id": "node-1", "connected_at": "2026-02-03T08:00:00Z"},
# ...
# ]
# }
```
### Server Logs
Server logs connection events, errors, and messages:
```
✓ WebSocket server listening on ws://localhost:8765
Client connected from ('192.168.1.100', 54321)
Registered: worker-1
Subscription: worker-1 -> task.new
Published: task.new (3 subscribers)
```
## Troubleshooting
### Connection Refused
**Problem**: `ConnectionRefusedError: [Errno 61] Connection refused`
**Solutions**:
- Verify server is running: `netstat -an | grep 8765`
- Check firewall: `sudo ufw allow 8765`
- Verify correct host/port in client URL
### No Messages Received
**Problem**: Client connected but not receiving messages
**Solutions**:
- Verify subscription: `await client.subscribe("topic", handler)`
- Check topic name matches publisher
- Ensure handler function is defined before subscribing
### Reconnection Loop
**Problem**: Client keeps reconnecting
**Solutions**:
- Check server logs for authentication errors
- Verify JWT token if auth enabled
- Increase reconnection delay in client
## Architecture
```
┌─────────────────────────────────────────────────────┐
│ WebSocket Server │
│ (Port 8765) │
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Connection │ │ Pub/Sub │ │
│ │ Manager │ │ Router │ │
│ └──────────────┘ └──────────────┘ │
└───────┬──────────────────┬──────────────────┬───────┘
│ │ │
┌───────▼────────┐ ┌─────▼──────┐ ┌───────▼────────┐
│ Bootstrap │ │ Worker 1 │ │ Worker 2 │
│ Manager │ │ │ │ │
│ │ │ │ │ │
│ Subscribe: │ │Subscribe: │ │Subscribe: │
│ - task.* │ │- task.new │ │- task.new │
│ - node.* │ │ │ │ │
│ │ │Publish: │ │Publish: │
│ Publish: │ │- task. │ │- task. │
│ - task.new │ │ claimed │ │ claimed │
│ - state. │ │- state. │ │- state. │
│ update │ │ update │ │ update │
└────────────────┘ └────────────┘ └────────────────┘
```
## API Reference
### Server API
```python
class SynapseWebSocketServer:
def __init__(self, host: str = "localhost", port: int = 8765)
async def start()
def stop()
def get_stats() -> dict
```
### Client API
```python
class SynapseWebSocketClient:
def __init__(self, node_id: str, server_url: str)
async def connect()
async def disconnect()
async def subscribe(topic: str, handler: Callable)
async def publish(topic: str, data: dict)
async def query_stats() -> dict
```
## Next Steps
- [x] Month 1: Core infrastructure (complete)
- [x] Month 2: Client integration and demo (complete)
- [ ] Month 3: Production features (HA, monitoring, security)
- [ ] Month 4: Web dashboard and advanced features
See `docs/REALTIME_COMMUNICATION_RESEARCH.md` for full roadmap.