#!/usr/bin/env python3
import asyncio
import sys
import os
from amicus.websocket_client import SynapseWebSocketClient
from amicus.server import mcp, run as run_mcp_server
# This test needs to run the server and client in the same event loop
# to allow direct access to the server's tools.
async def main():
# Set the WebSocket server URL for the comm_layer
os.environ['AMICUS_WS_SERVER'] = 'ws://localhost:8765'
# Start the WebSocket server in the background
ws_server_command = [sys.executable, '-m', 'amicus.websocket_server']
ws_process = await asyncio.create_subprocess_exec(
*ws_server_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env={'PYTHONPATH': 'src'}
)
print(f"WebSocket server starting with PID: {ws_process.pid}")
await asyncio.sleep(2)
# Start the MCP server logic in a background task
# This doesn't block, but initializes the comm_layer
# We are not calling mcp.run() because that would block forever.
from amicus.server import UnifiedCommunicationLayer
import amicus.server
amicus.server.comm_layer = UnifiedCommunicationLayer(node_id="amicus-server")
test_successful = asyncio.Event()
received_data = None
async def on_event(topic, data):
nonlocal received_data
print(f"Event received: {data}")
received_data = data
test_successful.set()
# Setup and connect the client
client = SynapseWebSocketClient(node_id="test-client")
client_task = asyncio.create_task(client.connect())
try:
# Wait for client to connect
for _ in range(10):
if client.connected: break
await asyncio.sleep(0.5)
if not client.connected: raise asyncio.TimeoutError("Client connect timeout")
print("Client connected.")
await client.subscribe("state_updated", on_event)
print("Client subscribed.")
# Directly call the update_state tool function from the imported server
print("Calling update_state tool function...")
from amicus.server import update_state
await update_state.fn(summary="Final test summary", next_steps=[], active_files=[])
print("Tool function called.")
# Wait for the event to be received
await asyncio.wait_for(test_successful.wait(), timeout=5)
if received_data and "Final test summary" in received_data.get("summary", ""):
print("Test PASSED")
else:
print(f"Test FAILED: Incorrect data received: {received_data}")
except asyncio.TimeoutError:
print("Test FAILED: Timed out waiting for event.")
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
# Cleanup
await client.disconnect()
client_task.cancel()
ws_process.terminate()
await ws_process.wait()
if __name__ == "__main__":
# A bit of a hack to make the global comm_layer available
from amicus.server import comm_layer
asyncio.run(main())