import threading
import time
from typing import Optional
from .core import get_state_file, read_with_lock, write_with_lock
from .config import ConfigManager
class HeartbeatMonitor:
"""
Background monitor that checks for zombie agents.
If an agent has 'active_files' but hasn't sent a heartbeat in 'heartbeat_timeout' seconds,
the monitor will release the locks and alert the user.
"""
def __init__(self, interval: int = 5):
self.interval = interval
self.running = False
self.thread: Optional[threading.Thread] = None
self._config = ConfigManager()
def start(self):
"""Start the monitor thread."""
self.running = True
self.thread = threading.Thread(target=self._loop, daemon=True)
self.thread.start()
def stop(self):
"""Stop the monitor thread."""
self.running = False
if self.thread:
self.thread.join(timeout=1.0)
def _loop(self):
while self.running:
try:
self._check_health()
except Exception:
# Fail silently to avoid spamming stderr,
# or maybe print to a log file if we had one.
pass
time.sleep(self.interval)
def _check_health(self):
state_file = get_state_file()
if not state_file.exists():
return
# Use read_with_lock to avoid race conditions
state = read_with_lock(state_file)
if not state:
return
# 1. Check for zombie agents (holding files)
active_files = state.get("active_files", [])
last_heartbeat = state.get("last_heartbeat", 0)
timeout = self._config.get("heartbeat_timeout", 60)
if active_files and last_heartbeat != 0:
time_since_heartbeat = time.time() - last_heartbeat
if time_since_heartbeat > timeout:
self._handle_zombie(state, state_file, time_since_heartbeat)
# Refresh state after zombie handle
state = read_with_lock(state_file)
# 2. Check for idle agents and mark as terminated
self._check_idle_agents(state, state_file)
state = read_with_lock(state_file) # Refresh after idle check
# 3. Check for missing Bootstrap Manager
self._check_cluster_topology(state, state_file)
def _check_idle_agents(self, state, state_file):
"""Mark idle agents as terminated after grace period."""
cluster_nodes = state.get("cluster_nodes", {})
cluster_settings = self._config.get("cluster_settings", {})
idle_timeout = cluster_settings.get("grace_period_seconds", 60)
modified = False
for agent_id, node_info in cluster_nodes.items():
status = node_info.get("status")
idle_since = node_info.get("idle_since")
role = node_info.get("role")
# Skip bootstrap managers - they never auto-terminate
if role == "bootstrap_manager":
continue
# Mark stale idle nodes as terminated
if status == "idle" and idle_since:
idle_duration = time.time() - idle_since
if idle_duration > idle_timeout:
node_info["status"] = "terminated"
modified = True
if modified:
# Update cluster metadata
active_count = sum(
1 for n in cluster_nodes.values()
if n.get("status") != "terminated"
)
idle_count = sum(
1 for n in cluster_nodes.values()
if n.get("status") == "idle"
)
if "cluster_metadata" not in state:
state["cluster_metadata"] = {}
state["cluster_metadata"]["active_agent_count"] = active_count
state["cluster_metadata"]["idle_agent_count"] = idle_count
state["timestamp"] = time.time()
write_with_lock(state_file, state)
def _check_cluster_topology(self, state, state_file):
"""Ensure a bootstrap manager exists if there is active work."""
cluster_nodes = state.get("cluster_nodes", {})
# Check for any active bootstrap_manager
has_active_bootstrap = any(
info.get("role") == "bootstrap_manager" and (time.time() - info.get("last_heartbeat", 0) < 60)
for info in cluster_nodes.values()
)
if not has_active_bootstrap:
self._recover_bootstrap_manager(state, state_file)
def _recover_bootstrap_manager(self, state, state_file):
summary = state.get("summary", "")
if "🚨 SYSTEM: Bootstrap Manager is missing or stale." in summary:
return # Already flagged
message = "\n\n🚨 SYSTEM: Bootstrap Manager is missing or stale. Recovery initiated."
state["summary"] = summary + message
state["ask_user"] = True # Prompt human to talk to a new bootstrap agent
# Ensure next_steps is a list
next_steps = state.get("next_steps", [])
if isinstance(next_steps, str):
next_steps = [{"task": next_steps, "status": "pending"}]
# Add high-priority recovery task
recovery_task = {
"task": "RESTORE BOOTSTRAP MANAGER: A node must join as bootstrap_manager.",
"status": "pending",
"priority": "high",
"id": f"rec-{int(time.time())}"
}
next_steps.insert(0, recovery_task)
state["next_steps"] = next_steps
write_with_lock(state_file, state)
def _handle_zombie(self, state, state_file, lag):
agent_id = next((aid for aid, info in state.get("cluster_nodes", {}).items() if info.get("last_heartbeat") == state.get("last_heartbeat")), "Unknown")
node_info = state.get("cluster_nodes", {}).get(agent_id, {})
role = node_info.get("role", "worker")
message = f"\n\n🚨 SYSTEM: Node {agent_id} ({role}) crashed (Heartbeat Timeout: {lag:.1f}s). Locks released."
if role == "bootstrap_manager":
message += "\n⚠️ BOOTSTRAP MANAGER DOWN. Cluster requires re-initialization or election of a new manager."
state["bootstrap_required"] = True
# Update state
state["active_files"] = []
state["summary"] = state.get("summary", "") + message
state["ask_user"] = True
state["timestamp"] = time.time()
# Reset heartbeat
state["last_heartbeat"] = time.time()
write_with_lock(state_file, state)