#!/usr/bin/env python3
import asyncio
import sys
import time
from amicus.websocket_client import SynapseWebSocketClient
from amicus.server import update_state, run
async def main():
# Run server in background
server_command = [sys.executable, '-m', 'amicus.websocket_server']
env = {'PYTHONPATH': 'src'}
server_process = await asyncio.create_subprocess_exec(
*server_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)
print(f"Server starting with PID: {server_process.pid}")
await asyncio.sleep(2)
test_successful = asyncio.Event()
received_event_data = None
async def on_event(topic, data):
nonlocal received_event_data
print(f"Client received event on topic '{topic}': {data}")
received_event_data = data
if topic == 'state_updated' and "Test summary" in data.get("summary", ""):
test_successful.set()
client = SynapseWebSocketClient(node_id="test-client")
client_task = asyncio.create_task(client.connect())
try:
for _ in range(10):
if client.connected:
break
await asyncio.sleep(0.5)
if not client.connected:
raise asyncio.TimeoutError("Client failed to connect")
print("Client connected.")
await client.subscribe('state_updated', on_event)
print("Client subscribed.")
await asyncio.sleep(0.5)
print("Calling update_state...")
await update_state.fn(summary="Test summary", next_steps=[], active_files=[])
print("update_state called.")
print("Waiting for event...")
await asyncio.wait_for(test_successful.wait(), timeout=5.0)
print("Test PASSED")
except asyncio.TimeoutError:
print("Test FAILED: Timed out.")
print(f"Received data: {received_event_data}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
await client.disconnect()
client_task.cancel()
if server_process.returncode is None:
server_process.terminate()
await server_process.wait()
print("Test finished.")
if __name__ == "__main__":
asyncio.run(main())