#!/usr/bin/env python3
"""
Amicus Swarm Simulation
=======================
This script simulates a swarm of AI agents interacting with the Amicus Context Bus.
It validates the multi-agent coordination primitives:
- Task claiming/completion
- State locking
- Message broadcasting
- Heartbeats
Usage:
python3 scripts/simulate_swarm.py --agents 5 --tasks 20
"""
import argparse
import random
import sys
import threading
import time
from pathlib import Path
# Add src to path
src_path = Path(__file__).parent.parent / "src"
sys.path.insert(0, str(src_path))
from amicus.server import (
update_state,
read_state,
add_task,
claim_task,
complete_task,
broadcast_message,
heartbeat
)
from amicus.core import set_tracking_enabled, get_context_bus_dir
# Colors for terminal output
COLORS = [
"\033[31m", "\033[32m", "\033[33m", "\033[34m", "\033[35m", "\033[36m"
]
RESET = "\033[0m"
def agent_worker(agent_id: str, color: str):
"""Simulates a single agent's lifecycle."""
print(f"{color}[{agent_id}] Online.{RESET}")
# Send initial heartbeat
heartbeat.fn()
while True:
# random delay to simulate work/thinking
time.sleep(random.uniform(0.5, 2.0))
# 1. Read State
try:
state_json = read_state.fn() # Returns formatted string
# For this sim, we mostly rely on the tool returns
except Exception as e:
print(f"{color}[{agent_id}] Error reading state: {e}{RESET}")
continue
# 2. Check for pending tasks (naive parsing of the string output for this demo)
# Try to claim a random task index (1 to 20)
task_idx = random.randint(1, 20)
claim_result = claim_task.fn(task_idx, agent_id)
if "claimed by" in claim_result and "Error" not in claim_result:
print(f"{color}[{agent_id}] Claimed Task #{task_idx}{RESET}")
# Simulate work
broadcast_message.fn(f"Node {agent_id} starting work on Task #{task_idx}")
time.sleep(random.uniform(1.0, 3.0))
heartbeat.fn() # Keep alive
# Complete task
complete_result = complete_task.fn(task_idx, agent_id, outcome="Simulated Success")
print(f"{color}[{agent_id}] Finished Task #{task_idx}: {complete_result}{RESET}")
elif "Error" in claim_result:
# Task likely invalid or taken, just idle
pass
# 5% chance to add a new subtask
if random.random() < 0.05:
new_task = f"Subtask generated by {agent_id}"
add_task.fn(new_task, priority="low")
print(f"{color}[{agent_id}] Added new task: {new_task}{RESET}")
# Stop condition (check if we should exit - simplified for demo)
# In a real test we'd check if all tasks are done.
def main():
parser = argparse.ArgumentParser(description="Run an Amicus agent swarm simulation.")
parser.add_argument("--agents", type=int, default=3, help="Number of concurrent agents")
parser.add_argument("--tasks", type=int, default=10, help="Initial tasks to create")
args = parser.parse_args()
print("โก Initializing Amicus Swarm Simulation...")
set_tracking_enabled(True)
# 1. seed tasks
print(f"๐ Seeding {args.tasks} tasks...")
update_state.fn(
summary="Swarm Simulation Started",
next_steps=[], # clear old
active_files=[],
messages=[]
)
for i in range(args.tasks):
add_task.fn(f"Simulation Task #{i+1}", priority="medium")
# 2. Start Agents
threads = []
print(f"๐ Launching {args.agents} agents...")
stop_event = threading.Event()
for i in range(args.agents):
agent_id = f"Node-{chr(65+i)}" # Node-A, Node-B...
color = COLORS[i % len(COLORS)]
t = threading.Thread(target=agent_worker, args=(agent_id, color), daemon=True)
threads.append(t)
t.start()
try:
# Run for a fixed time or until Ctrl+C
start_time = time.time()
duration = 15 # seconds
while time.time() - start_time < duration:
time.sleep(1)
print(".", end="", flush=True)
print("\n\nโฑ๏ธ Simulation time up.")
except KeyboardInterrupt:
print("\n๐ Simulation stopped by user.")
print("๐ Final State Summary:")
print(read_state.fn())
if __name__ == "__main__":
main()