import pytest
import time
import threading
from amicus import core
from amicus import server as server_module
import json
from pathlib import Path
from types import SimpleNamespace
# Create a SimpleNamespace to access underlying functions from FunctionTool wrappers
server = SimpleNamespace(
update_state=server_module.update_state.fn,
read_state=server_module.read_state.fn
)
def test_concurrent_updates(temp_context_dir):
"""Test multiple threads updating state simultaneously."""
core.set_tracking_enabled(True)
def worker(id):
for i in range(5):
try:
server.update_state(
f"Agent {id} update {i}",
[{"task": f"Next step {i}", "status": "TODO"}],
[f"file_{id}_{i}.py"]
)
time.sleep(0.1)
except Exception as e:
print(f"Worker {id} error: {e}")
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
# Verify final state is valid JSON and readable
state_content = server.read_state()
assert "Context Bus State" in state_content
# Verify the actual file content is valid JSON
state_file = core.get_state_file()
with open(state_file) as f:
data = json.load(f)
assert "summary" in data
def test_read_while_writing(temp_context_dir):
"""Test reading state while it's being written."""
core.set_tracking_enabled(True)
stop_event = threading.Event()
def writer():
i = 0
while not stop_event.is_set():
try:
server.update_state(f"Update {i}", [{"task": "Next", "status": "TODO"}], [])
i += 1
except Exception:
pass
time.sleep(0.01)
writer_thread = threading.Thread(target=writer)
writer_thread.start()
# Start reader
success_count = 0
start_time = time.time()
while time.time() - start_time < 1:
try:
server.read_state()
success_count += 1
except Exception:
pass
time.sleep(0.02)
stop_event.set()
writer_thread.join()
assert success_count > 0