#!/usr/bin/env python3
"""
Demo: WebSocket real-time communication between Amicus nodes
This demonstrates Month 2 features from Phase 4 WebSocket implementation.
"""
import asyncio
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from amicus.websocket_server import SynapseWebSocketServer
from amicus.websocket_client import SynapseWebSocketClient
async def run_server():
"""Run WebSocket server"""
server = SynapseWebSocketServer(host="localhost", port=8765)
print("π Starting WebSocket server on ws://localhost:8765")
await server.start()
async def run_bootstrap_manager():
"""Run bootstrap manager node"""
await asyncio.sleep(1) # Let server start
client = SynapseWebSocketClient("bootstrap-mgr", "ws://localhost:8765")
# Track state updates
state_updates = []
def on_state_update(data):
print(f"π₯ [Bootstrap] State update: {data}")
state_updates.append(data)
def on_task_claimed(data):
print(f"β
[Bootstrap] Task claimed: {data}")
try:
await client.connect()
print("π’ [Bootstrap] Connected")
# Subscribe to events
await client.subscribe("state.update", on_state_update)
await client.subscribe("task.claimed", on_task_claimed)
# Wait for worker nodes
await asyncio.sleep(3)
# Publish a task
await client.publish("task.new", {
"task_id": "task-001",
"description": "Process data batch",
"priority": "high"
})
print("π€ [Bootstrap] Published new task")
# Keep alive
await asyncio.sleep(10)
finally:
await client.disconnect()
print("π΄ [Bootstrap] Disconnected")
async def run_worker(worker_id: str):
"""Run worker node"""
await asyncio.sleep(2) # Let server and bootstrap start
client = SynapseWebSocketClient(worker_id, "ws://localhost:8765")
def on_new_task(data):
print(f"π [{worker_id}] New task available: {data}")
# Claim the task
asyncio.create_task(client.publish("task.claimed", {
"task_id": data.get("task_id"),
"claimed_by": worker_id
}))
try:
await client.connect()
print(f"π’ [{worker_id}] Connected")
# Subscribe to new tasks
await client.subscribe("task.new", on_new_task)
# Simulate work
for i in range(5):
await asyncio.sleep(2)
await client.publish("state.update", {
"node_id": worker_id,
"status": "working",
"progress": f"{(i+1)*20}%"
})
await asyncio.sleep(2)
finally:
await client.disconnect()
print(f"π΄ [{worker_id}] Disconnected")
async def main():
"""Run demo with server, bootstrap, and workers"""
print("=" * 60)
print("WebSocket Real-Time Communication Demo")
print("Issue #20 - Phase 4 Month 2 Features")
print("=" * 60)
print()
# Start all components concurrently
await asyncio.gather(
run_server(),
run_bootstrap_manager(),
run_worker("worker-1"),
run_worker("worker-2"),
return_exceptions=True
)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nπ Demo stopped")