#!/usr/bin/env python3
"""
Anti-Idle System Demonstration
This script simulates the lifecycle of a multi-node Synapse cluster
with the anti-idle system active.
"""
import time
import sys
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.amicus.core import get_state_file, read_with_lock, write_with_lock
from src.amicus.config import ConfigManager
from src.amicus.monitor import HeartbeatMonitor
def print_state_summary():
"""Print current cluster state."""
state = read_with_lock(get_state_file())
print("\n" + "=" * 60)
print("CLUSTER STATE SNAPSHOT")
print("=" * 60)
# Cluster metadata
metadata = state.get("cluster_metadata", {})
print(f"Active Nodes: {metadata.get('active_agent_count', 0)}/{metadata.get('max_agents', 4)}")
print(f"Idle Nodes: {metadata.get('idle_agent_count', 0)}")
print(f"Pending Tasks: {metadata.get('pending_task_count', 0)}")
# Work distribution
work_dist = state.get("work_distribution", {})
if work_dist:
print(f"\nWorkload Status: {work_dist.get('workload_status', 'unknown').upper()}")
print(f"Recommendation: {work_dist.get('spawn_recommendation', 'none')}")
# Nodes
cluster_nodes = state.get("cluster_nodes", {})
if cluster_nodes:
print(f"\nNodes ({len(cluster_nodes)}):")
for node_id, info in cluster_nodes.items():
status = info.get("status", "unknown")
role = info.get("role", "unknown")
idle_since = info.get("idle_since")
status_emoji = {
"working": "π’",
"idle": "π‘",
"waiting": "π΅",
"terminated": "π΄"
}.get(status, "βͺ")
line = f" {status_emoji} {node_id} ({role}) - {status}"
if idle_since:
idle_duration = time.time() - idle_since
line += f" [{idle_duration:.1f}s idle]"
print(line)
# Tasks
next_steps = state.get("next_steps", [])
if next_steps:
print(f"\nTasks ({len(next_steps)}):")
for i, task in enumerate(next_steps):
if isinstance(task, dict):
task_desc = task.get("task", "No description")
status = task.get("status", "pending")
priority = task.get("priority", "medium")
assigned = task.get("assigned_to", "")
status_symbol = {
"pending": "βΈοΈ",
"in_progress": "βΆοΈ",
"completed": "β
",
"failed": "β"
}.get(status, "β")
line = f" {status_symbol} [{priority}] {task_desc}"
if assigned:
line += f" (β {assigned})"
print(line)
print("=" * 60)
def simulate_registration(agent_id, role, model_name):
"""Simulate node registration."""
print(f"\nπ Registering {agent_id} as {role}...")
state_file = get_state_file()
state = read_with_lock(state_file)
if not state:
state = {"summary": "Cluster Initialized", "next_steps": [], "active_files": []}
config = ConfigManager()
cluster_settings = config.get("cluster_settings", {})
# Initialize metadata if needed
if "cluster_metadata" not in state:
state["cluster_metadata"] = {
"max_agents": cluster_settings.get("max_agents", 4),
"active_agent_count": 0,
"idle_agent_count": 0,
"pending_task_count": 0,
"manager_id": None
}
# Check capacity
cluster_nodes = state.get("cluster_nodes", {})
active_count = sum(1 for n in cluster_nodes.values() if n.get("status") != "terminated")
max_agents = state["cluster_metadata"]["max_agents"]
if active_count >= max_agents and agent_id not in cluster_nodes:
print(f"β Registration rejected: Cluster at capacity ({max_agents} nodes)")
return False
# Register node
now = time.time()
cluster_nodes[agent_id] = {
"role": role,
"model": {"name": model_name, "strength": "high"},
"last_heartbeat": now,
"status": "working",
"current_task_id": None,
"idle_since": None,
"last_activity": now
}
if role == "bootstrap_manager":
state["cluster_metadata"]["manager_id"] = agent_id
state["cluster_nodes"] = cluster_nodes
state["cluster_metadata"]["active_agent_count"] = sum(
1 for n in cluster_nodes.values() if n.get("status") != "terminated"
)
state["timestamp"] = now
write_with_lock(state_file, state)
print(f"β
{agent_id} registered successfully")
return True
def simulate_add_tasks(count, priority="medium"):
"""Add tasks to the queue."""
print(f"\nπ Adding {count} tasks (priority: {priority})...")
state_file = get_state_file()
state = read_with_lock(state_file)
steps = state.get("next_steps", [])
for i in range(count):
steps.append({
"task": f"Task {len(steps) + 1}: Sample work item",
"status": "pending",
"priority": priority,
"created_at": time.time()
})
state["next_steps"] = steps
state["timestamp"] = time.time()
write_with_lock(state_file, state)
print(f"β
Added {count} tasks")
def simulate_assess_workload():
"""Simulate Bootstrap Manager workload assessment."""
print("\nπ Manager assessing workload...")
state_file = get_state_file()
state = read_with_lock(state_file)
cluster_nodes = state.get("cluster_nodes", {})
steps = state.get("next_steps", [])
# Count metrics
active_agents = sum(1 for n in cluster_nodes.values()
if n.get("status") not in ["terminated", "idle"])
idle_agents = sum(1 for n in cluster_nodes.values()
if n.get("status") == "idle")
pending_tasks = sum(1 for t in steps
if isinstance(t, dict) and t.get("status") == "pending")
in_progress = sum(1 for t in steps
if isinstance(t, dict) and t.get("status") == "in_progress")
max_agents = state.get("cluster_metadata", {}).get("max_agents", 4)
total_active = active_agents + idle_agents
# Determine workload status
if pending_tasks == 0 and in_progress == 0:
workload_status = "idle"
recommendation = "terminate_idle"
elif pending_tasks >= 3 and total_active < max_agents:
workload_status = "overloaded"
recommendation = "spawn_developer"
elif pending_tasks > 0 and active_agents == 0 and total_active < max_agents:
workload_status = "overloaded"
recommendation = "spawn_developer"
elif pending_tasks <= 1 and total_active > 1 and idle_agents > 0:
workload_status = "underutilized"
recommendation = "terminate_idle"
else:
workload_status = "balanced"
recommendation = "maintain"
# Update state
if "work_distribution" not in state:
state["work_distribution"] = {}
state["work_distribution"].update({
"last_assessment": time.time(),
"workload_status": workload_status,
"spawn_recommendation": recommendation
})
state["cluster_metadata"].update({
"active_agent_count": total_active,
"idle_agent_count": idle_agents,
"pending_task_count": pending_tasks
})
state["timestamp"] = time.time()
write_with_lock(state_file, state)
print(f"β
Assessment complete: {workload_status.upper()} β {recommendation}")
def simulate_claim_task(agent_id):
"""Simulate a node claiming a task."""
print(f"\nπ― {agent_id} claiming task...")
state_file = get_state_file()
state = read_with_lock(state_file)
steps = state.get("next_steps", [])
# Find first pending task
for i, task in enumerate(steps):
if isinstance(task, dict) and task.get("status") == "pending":
task["status"] = "in_progress"
task["assigned_to"] = agent_id
task["claimed_at"] = time.time()
# Update node status
cluster_nodes = state.get("cluster_nodes", {})
if agent_id in cluster_nodes:
cluster_nodes[agent_id]["status"] = "working"
cluster_nodes[agent_id]["current_task_id"] = f"task-{i+1}"
cluster_nodes[agent_id]["idle_since"] = None
state["timestamp"] = time.time()
write_with_lock(state_file, state)
print(f"β
{agent_id} claimed: {task.get('task', 'No description')}")
return True
print(f"β οΈ No pending tasks available for {agent_id}")
return False
def simulate_complete_task(agent_id):
"""Simulate a node completing its task."""
print(f"\nβ
{agent_id} completing task...")
state_file = get_state_file()
state = read_with_lock(state_file)
steps = state.get("next_steps", [])
# Find task assigned to this agent
for task in steps:
if isinstance(task, dict) and task.get("assigned_to") == agent_id:
task["status"] = "completed"
task["completed_by"] = agent_id
task["completed_at"] = time.time()
# Update node status
cluster_nodes = state.get("cluster_nodes", {})
if agent_id in cluster_nodes:
cluster_nodes[agent_id]["current_task_id"] = None
cluster_nodes[agent_id]["last_activity"] = time.time()
state["timestamp"] = time.time()
write_with_lock(state_file, state)
print(f"β
{agent_id} completed: {task.get('task', 'No description')}")
return True
print(f"β οΈ {agent_id} has no active tasks")
return False
def simulate_go_idle(agent_id):
"""Simulate a node going idle."""
print(f"\nπ‘ {agent_id} going idle (no work available)...")
state_file = get_state_file()
state = read_with_lock(state_file)
cluster_nodes = state.get("cluster_nodes", {})
if agent_id in cluster_nodes:
now = time.time()
cluster_nodes[agent_id]["status"] = "idle"
cluster_nodes[agent_id]["idle_since"] = now
cluster_nodes[agent_id]["last_activity"] = now
state["timestamp"] = now
write_with_lock(state_file, state)
print(f"β
{agent_id} marked as idle")
return True
print(f"β {agent_id} not found")
return False
def simulate_terminate(agent_id):
"""Simulate a node terminating."""
print(f"\nπ΄ {agent_id} terminating...")
state_file = get_state_file()
state = read_with_lock(state_file)
cluster_nodes = state.get("cluster_nodes", {})
if agent_id in cluster_nodes:
cluster_nodes[agent_id]["status"] = "terminated"
# Update metadata
state["cluster_metadata"]["active_agent_count"] = sum(
1 for n in cluster_nodes.values() if n.get("status") != "terminated"
)
state["cluster_metadata"]["idle_agent_count"] = sum(
1 for n in cluster_nodes.values() if n.get("status") == "idle"
)
state["timestamp"] = time.time()
write_with_lock(state_file, state)
print(f"β
{agent_id} terminated")
return True
print(f"β {agent_id} not found")
return False
def cleanup():
"""Clean up state file."""
state_file = get_state_file()
if state_file.exists():
state_file.unlink()
def run_demo():
"""Run the anti-idle system demonstration."""
print("\n" + "=" * 60)
print("ANTI-IDLE SYSTEM DEMONSTRATION")
print("=" * 60)
# Clean start
cleanup()
# Phase 1: Bootstrap and initial workload
print("\n### PHASE 1: Cluster Bootstrap ###")
simulate_registration("Node-BM-001", "bootstrap_manager", "claude-sonnet-4-5")
simulate_add_tasks(5, priority="high")
simulate_assess_workload()
print_state_summary()
time.sleep(1)
# Phase 2: Spawn workers due to overload
print("\n### PHASE 2: Workload Scaling Up ###")
simulate_registration("Node-DEV-001", "developer", "claude-sonnet-4-5")
simulate_registration("Node-DEV-002", "developer", "claude-sonnet-4-5")
simulate_assess_workload()
print_state_summary()
time.sleep(1)
# Phase 3: Workers claim and execute tasks
print("\n### PHASE 3: Task Execution ###")
simulate_claim_task("Node-DEV-001")
simulate_claim_task("Node-DEV-002")
simulate_assess_workload()
print_state_summary()
time.sleep(1)
# Phase 4: Complete tasks, reducing workload
print("\n### PHASE 4: Task Completion ###")
simulate_complete_task("Node-DEV-001")
simulate_complete_task("Node-DEV-002")
simulate_claim_task("Node-DEV-001") # Claim another
simulate_complete_task("Node-DEV-001")
simulate_assess_workload()
print_state_summary()
time.sleep(1)
# Phase 5: Workers go idle
print("\n### PHASE 5: Idle Detection ###")
simulate_go_idle("Node-DEV-001")
simulate_go_idle("Node-DEV-002")
simulate_assess_workload()
print_state_summary()
time.sleep(1)
# Phase 6: Graceful termination
print("\n### PHASE 6: Graceful Termination ###")
print("(Simulating 30-second grace period...)")
simulate_terminate("Node-DEV-001")
simulate_terminate("Node-DEV-002")
simulate_assess_workload()
print_state_summary()
# Phase 7: Test capacity enforcement
print("\n### PHASE 7: Capacity Enforcement ###")
simulate_registration("Node-DEV-003", "developer", "claude-sonnet-4-5")
simulate_registration("Node-DEV-004", "developer", "claude-sonnet-4-5")
simulate_registration("Node-DEV-005", "developer", "claude-sonnet-4-5") # Should fail
print_state_summary()
print("\n" + "=" * 60)
print("DEMONSTRATION COMPLETE")
print("=" * 60)
# Cleanup
print("\nπ§Ή Cleaning up...")
cleanup()
print("β
Cleanup complete")
if __name__ == "__main__":
try:
run_demo()
except KeyboardInterrupt:
print("\n\nβ οΈ Demo interrupted by user")
cleanup()
except Exception as e:
print(f"\n\nβ Demo failed: {e}")
import traceback
traceback.print_exc()
cleanup()