#!/usr/bin/env python3
"""
Unified communication layer for Amicus Synapse nodes.
Selects the best available communication backend (WebSocket, IPC, etc.)
and provides a consistent API for publishing and subscribing to events.
"""
import os
import asyncio
from typing import Optional, Callable
# Import backends
from .websocket_client import SynapseWebSocketClient
class UnifiedCommunicationLayer:
"""
Selects and manages the primary communication backend for a Synapse node.
"""
def __init__(self, node_id: str):
self.node_id = node_id
self.backend: Optional[SynapseWebSocketClient] = None
self._init_backend()
def _init_backend(self):
"""Initializes the best available communication backend."""
ws_server_url = os.getenv("AMICUS_WS_SERVER")
if ws_server_url:
print(f"[{self.node_id}] WebSocket server configured. Initializing WebSocket backend.")
try:
self.backend = SynapseWebSocketClient(self.node_id, ws_server_url)
# The connect method now runs a loop, so we run it in the background
asyncio.create_task(self.backend.connect())
print(f"[{self.node_id}] WebSocket backend started.")
except Exception as e:
print(f"[{self.node_id}] Failed to initialize WebSocket backend: {e}")
self.backend = None
else:
print(f"[{self.node_id}] No real-time communication backend configured. Polling will be used.")
async def publish(self, topic: str, data: dict):
if self.backend and self.backend.connected:
await self.backend.publish(topic, data)
async def subscribe(self, topic: str, handler: Callable):
if self.backend:
await self.backend.subscribe(topic, handler)
async def disconnect(self):
if self.backend:
await self.backend.disconnect()