#![allow(dead_code)]
use anyhow::{anyhow, Result};
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::env;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};
use url::Url;
use crate::orchestrator::web_config::WebConfig;
use crate::web::chrome::ChromeFetchResult;
use crate::web::playwright::{fetch_dom as fetch_dom_playwright, PlaywrightFetchConfig};
use crate::metrics;
#[derive(Clone, Debug)]
pub struct ChromeWatchdogConfig {
pub scan_interval: Duration,
pub orphan_reap_after: Duration,
pub graceful_shutdown_timeout: Duration,
pub kill_timeout: Duration,
pub max_session_age: Option<Duration>,
pub unresponsive_timeout: Option<Duration>,
}
impl Default for ChromeWatchdogConfig {
fn default() -> Self {
Self {
scan_interval: Duration::from_secs(5),
orphan_reap_after: Duration::from_secs(30),
graceful_shutdown_timeout: Duration::from_secs(2),
kill_timeout: Duration::from_secs(2),
max_session_age: None,
unresponsive_timeout: None,
}
}
}
impl ChromeWatchdogConfig {
pub fn from_env() -> Option<Self> {
let enabled = env_boolish("DOCDEX_CHROME_WATCHDOG_ENABLED").unwrap_or(true);
if !enabled {
return None;
}
let mut config = Self::default();
if let Some(ms) = env_u64("DOCDEX_CHROME_WATCHDOG_SCAN_INTERVAL_MS") {
config.scan_interval = Duration::from_millis(ms.max(10));
}
if let Some(ms) = env_u64("DOCDEX_CHROME_WATCHDOG_ORPHAN_REAP_AFTER_MS") {
config.orphan_reap_after = Duration::from_millis(ms.max(0));
}
if let Some(ms) = env_u64("DOCDEX_CHROME_WATCHDOG_GRACEFUL_TIMEOUT_MS") {
config.graceful_shutdown_timeout = Duration::from_millis(ms.max(10));
}
if let Some(ms) = env_u64("DOCDEX_CHROME_WATCHDOG_KILL_TIMEOUT_MS") {
config.kill_timeout = Duration::from_millis(ms.max(10));
}
if let Some(ms) = env_u64("DOCDEX_CHROME_WATCHDOG_MAX_SESSION_AGE_MS") {
config.max_session_age = Some(Duration::from_millis(ms.max(1)));
}
if let Some(ms) = env_u64("DOCDEX_CHROME_WATCHDOG_UNRESPONSIVE_TIMEOUT_MS") {
config.unresponsive_timeout = Some(Duration::from_millis(ms.max(1)));
}
Some(config)
}
}
fn env_u64(key: &str) -> Option<u64> {
let raw = env::var(key).ok()?;
let trimmed = raw.trim();
if trimmed.is_empty() {
return None;
}
trimmed.parse::<u64>().ok()
}
fn env_boolish(key: &str) -> Option<bool> {
let raw = env::var(key).ok()?;
let trimmed = raw.trim().to_ascii_lowercase();
match trimmed.as_str() {
"1" | "true" | "t" | "yes" | "y" | "on" => Some(true),
"0" | "false" | "f" | "no" | "n" | "off" => Some(false),
_ => None,
}
}
#[derive(Clone, Debug)]
pub struct TrackedProcess {
pub pid: u32,
pub process_group_id: Option<i32>,
}
#[derive(Clone)]
pub struct ChromeProcessTracker {
inner: Arc<Inner>,
}
impl ChromeProcessTracker {
pub fn register(
&self,
session_id: impl Into<String>,
process: TrackedProcess,
) -> ChromeSessionHandle {
let session_id = session_id.into();
let process_for_log = process.clone();
let token;
{
let mut state = self.inner.state.lock();
token = state.next_token;
state.next_token = state.next_token.wrapping_add(1).max(1);
if let Some(previous_token) = state.session_current.get(&session_id).copied() {
if let Some(previous) = state.records.get_mut(&previous_token) {
previous.active = false;
previous.ended_at = Some(Instant::now());
previous.end_reason = Some(SessionEndReason::Replaced);
}
}
state.session_current.insert(session_id.clone(), token);
state.records.insert(
token,
SessionRecord {
session_id: session_id.clone(),
token,
process,
started_at: Instant::now(),
last_heartbeat: None,
active: true,
ended_at: None,
end_reason: None,
reaping: false,
last_reap_attempt: None,
},
);
}
info!(
target: "docdexd_browser_guard",
event = "chrome_watchdog_session_started",
session_id = session_id.as_str(),
token,
pid = process_for_log.pid,
pgid = process_for_log.process_group_id,
"chrome watchdog tracking session"
);
ChromeSessionHandle {
session_id,
token,
inner: Arc::clone(&self.inner),
}
}
pub fn end_session(&self, session_id: &str) {
let mut state = self.inner.state.lock();
let Some(token) = state.session_current.get(session_id).copied() else {
return;
};
if let Some(record) = state.records.get_mut(&token) {
if record.active {
record.active = false;
record.ended_at = Some(Instant::now());
record.end_reason = Some(SessionEndReason::Ended);
info!(
target: "docdexd_browser_guard",
event = "chrome_watchdog_session_ended",
session_id = record.session_id.as_str(),
token = record.token,
pid = record.process.pid,
pgid = record.process.process_group_id,
reason = ?SessionEndReason::Ended,
"chrome watchdog session ended"
);
}
}
}
pub fn heartbeat(&self, session_id: &str) {
let mut state = self.inner.state.lock();
let Some(token) = state.session_current.get(session_id).copied() else {
return;
};
if let Some(record) = state.records.get_mut(&token) {
if record.active {
record.last_heartbeat = Some(Instant::now());
}
}
}
}
#[derive(Clone)]
pub struct ChromeSessionHandle {
session_id: String,
token: u64,
inner: Arc<Inner>,
}
impl ChromeSessionHandle {
pub fn session_id(&self) -> &str {
&self.session_id
}
pub fn pid(&self) -> u32 {
let state = self.inner.state.lock();
state
.records
.get(&self.token)
.map(|record| record.process.pid)
.unwrap_or(0)
}
pub fn heartbeat(&self) {
let mut state = self.inner.state.lock();
if let Some(record) = state.records.get_mut(&self.token) {
if record.active {
record.last_heartbeat = Some(Instant::now());
}
}
}
pub fn end(&self) {
let mut state = self.inner.state.lock();
if let Some(record) = state.records.get_mut(&self.token) {
if record.active {
record.active = false;
record.ended_at = Some(Instant::now());
record.end_reason = Some(SessionEndReason::Ended);
info!(
target: "docdexd_browser_guard",
event = "chrome_watchdog_session_ended",
session_id = record.session_id.as_str(),
token = record.token,
pid = record.process.pid,
pgid = record.process.process_group_id,
reason = ?SessionEndReason::Ended,
"chrome watchdog session ended"
);
}
}
}
}
impl Drop for ChromeSessionHandle {
fn drop(&mut self) {
let mut state = self.inner.state.lock();
if let Some(record) = state.records.get_mut(&self.token) {
if record.active {
record.active = false;
record.ended_at = Some(Instant::now());
record.end_reason = Some(SessionEndReason::Dropped);
debug!(
target: "docdexd_browser_guard",
event = "chrome_watchdog_session_dropped",
session_id = record.session_id.as_str(),
token = record.token,
pid = record.process.pid,
pgid = record.process.process_group_id,
reason = ?SessionEndReason::Dropped,
"chrome watchdog session dropped"
);
}
}
}
}
pub struct ChromeWatchdog {
tracker: ChromeProcessTracker,
join: JoinHandle<()>,
}
impl ChromeWatchdog {
pub fn start(config: ChromeWatchdogConfig) -> Self {
let (tracker, inner) = new_tracker(config);
let join = tokio::spawn(watchdog_loop(inner));
Self { tracker, join }
}
pub fn tracker(&self) -> ChromeProcessTracker {
self.tracker.clone()
}
pub async fn shutdown(self) {
self.tracker.inner.shutdown.store(true, Ordering::Release);
self.tracker.inner.shutdown_notify.notify_one();
let _ = self.join.await;
}
}
static GLOBAL_TRACKER: OnceCell<ChromeProcessTracker> = OnceCell::new();
/// Initializes a detached, process-wide watchdog tracker.
///
/// The watchdog loop is spawned onto the current Tokio runtime (if available).
/// If no runtime is available, the tracker is still initialized but no periodic
/// reaping will occur until `init_global` is called from within a runtime.
pub fn init_global(config: ChromeWatchdogConfig) -> ChromeProcessTracker {
GLOBAL_TRACKER
.get_or_init(|| {
let (tracker, inner) = new_tracker(config);
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(watchdog_loop(inner));
} else {
warn!(
target: "docdexd",
"chrome watchdog initialized without a Tokio runtime; periodic reaping is disabled"
);
}
tracker
})
.clone()
}
pub fn init_global_from_env() -> Option<ChromeProcessTracker> {
ChromeWatchdogConfig::from_env().map(init_global)
}
pub fn global_tracker() -> Option<ChromeProcessTracker> {
GLOBAL_TRACKER.get().cloned()
}
fn new_tracker(config: ChromeWatchdogConfig) -> (ChromeProcessTracker, Arc<Inner>) {
let inner = Arc::new(Inner {
config,
state: Mutex::new(State {
next_token: 1,
session_current: HashMap::new(),
records: HashMap::new(),
}),
shutdown: AtomicBool::new(false),
shutdown_notify: Notify::new(),
});
let tracker = ChromeProcessTracker {
inner: Arc::clone(&inner),
};
(tracker, inner)
}
struct Inner {
config: ChromeWatchdogConfig,
state: Mutex<State>,
shutdown: AtomicBool,
shutdown_notify: Notify,
}
struct State {
next_token: u64,
session_current: HashMap<String, u64>,
records: HashMap<u64, SessionRecord>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum SessionEndReason {
Dropped,
Ended,
Replaced,
}
struct SessionRecord {
session_id: String,
token: u64,
process: TrackedProcess,
started_at: Instant,
last_heartbeat: Option<Instant>,
active: bool,
ended_at: Option<Instant>,
end_reason: Option<SessionEndReason>,
reaping: bool,
last_reap_attempt: Option<Instant>,
}
#[derive(Debug, Clone, Copy)]
enum ReapReason {
SessionOrphaned,
SessionTooOld,
SessionUnresponsive,
}
async fn watchdog_loop(inner: Arc<Inner>) {
loop {
if inner.shutdown.load(Ordering::Acquire) {
break;
}
tokio::select! {
_ = inner.shutdown_notify.notified() => {
continue;
}
_ = tokio::time::sleep(inner.config.scan_interval) => {
run_scan(&inner).await;
}
}
}
}
async fn run_scan(inner: &Arc<Inner>) {
let now = Instant::now();
let mut candidates: Vec<(String, u64, TrackedProcess, ReapReason)> = Vec::new();
let mut to_remove: Vec<u64> = Vec::new();
{
let mut state = inner.state.lock();
for (token, record) in state.records.iter_mut() {
if record.reaping {
continue;
}
if !is_process_alive(&record.process) {
metrics::global().inc_chrome_watchdog_reaped();
debug!(
target: "docdexd",
session_id = record.session_id.as_str(),
pid = record.process.pid,
"watchdog: tracked Chrome process already exited; removing record"
);
to_remove.push(*token);
continue;
}
let reason = if !record.active {
let ended_at = record.ended_at.unwrap_or(record.started_at);
if now.saturating_duration_since(ended_at) >= inner.config.orphan_reap_after {
Some(ReapReason::SessionOrphaned)
} else {
None
}
} else if inner
.config
.max_session_age
.is_some_and(|ttl| now.saturating_duration_since(record.started_at) >= ttl)
{
Some(ReapReason::SessionTooOld)
} else if inner.config.unresponsive_timeout.is_some() {
// Safety: only enforce unresponsive timeouts when the session has opted into sending heartbeats.
record.last_heartbeat.and_then(|last| {
inner.config.unresponsive_timeout.and_then(|timeout| {
(now.saturating_duration_since(last) >= timeout)
.then_some(ReapReason::SessionUnresponsive)
})
})
} else {
None
};
if let Some(reason) = reason {
record.reaping = true;
record.last_reap_attempt = Some(now);
candidates.push((
record.session_id.clone(),
record.token,
record.process.clone(),
reason,
));
}
}
for token in &to_remove {
let Some(record) = state.records.remove(token) else {
continue;
};
if state
.session_current
.get(&record.session_id)
.is_some_and(|current| *current == record.token)
{
state.session_current.remove(&record.session_id);
}
}
}
for (session_id, token, process, reason) in candidates {
reap_one(inner, &session_id, token, process, reason).await;
}
}
async fn reap_one(
inner: &Arc<Inner>,
session_id: &str,
token: u64,
process: TrackedProcess,
reason: ReapReason,
) {
let pid = process.pid;
let pgid = process.process_group_id;
metrics::global().inc_chrome_watchdog_reap_attempt();
info!(
target: "docdexd",
event = "chrome_watchdog_reap_start",
session_id,
pid,
pgid,
?reason,
"watchdog: reaping orphaned/unhealthy Chrome process"
);
let result = terminate_process(
&process,
inner.config.graceful_shutdown_timeout,
inner.config.kill_timeout,
)
.await;
match result {
Ok(()) => debug!(
target: "docdexd",
event = "chrome_watchdog_reap_done",
session_id,
pid,
"watchdog: terminate attempt complete"
),
Err(err) => warn!(
target: "docdexd",
event = "chrome_watchdog_reap_done",
session_id,
pid,
error = %err,
"watchdog: terminate attempt failed"
),
}
let mut state = inner.state.lock();
let Some(record) = state.records.get_mut(&token) else {
return;
};
if !is_process_alive(&record.process) {
metrics::global().inc_chrome_watchdog_reaped();
let record = state.records.remove(&token).expect("present");
if state
.session_current
.get(&record.session_id)
.is_some_and(|current| *current == record.token)
{
state.session_current.remove(&record.session_id);
}
} else {
metrics::global().inc_chrome_watchdog_reap_failure();
record.reaping = false;
}
}
fn is_process_alive(process: &TrackedProcess) -> bool {
#[cfg(unix)]
{
if let Some(pgid) = process.process_group_id {
return process_group_alive(pgid);
}
return pid_alive(process.pid as i32);
}
#[cfg(not(unix))]
{
// Best-effort: if we can't reliably check, keep the record and rely on termination attempts.
let _ = process;
true
}
}
async fn terminate_process(
process: &TrackedProcess,
graceful_timeout: Duration,
kill_timeout: Duration,
) -> Result<(), String> {
#[cfg(unix)]
{
if let Some(pgid) = process.process_group_id {
signal_process_group(pgid, nix::libc::SIGTERM);
if wait_until_dead(process, graceful_timeout).await {
return Ok(());
}
debug!(
target: "docdexd",
event = "chrome_watchdog_kill_escalation",
pid = process.pid,
pgid,
"watchdog escalating to SIGKILL"
);
signal_process_group(pgid, nix::libc::SIGKILL);
if wait_until_dead(process, kill_timeout).await {
return Ok(());
}
return Err("process group did not exit after SIGKILL".to_string());
}
signal_pid(process.pid as i32, nix::libc::SIGTERM);
if wait_until_dead(process, graceful_timeout).await {
return Ok(());
}
debug!(
target: "docdexd",
event = "chrome_watchdog_kill_escalation",
pid = process.pid,
"watchdog escalating to SIGKILL"
);
signal_pid(process.pid as i32, nix::libc::SIGKILL);
if wait_until_dead(process, kill_timeout).await {
return Ok(());
}
return Err("process did not exit after SIGKILL".to_string());
}
#[cfg(windows)]
{
// taskkill supports graceful-ish termination and force.
if wait_until_dead(process, Duration::from_millis(1)).await {
return Ok(());
}
terminate_windows(process.pid, false).await?;
if wait_until_dead(process, graceful_timeout).await {
return Ok(());
}
terminate_windows(process.pid, true).await?;
if wait_until_dead(process, kill_timeout).await {
return Ok(());
}
Err("process did not exit after taskkill /F".to_string())
}
#[cfg(all(not(unix), not(windows)))]
{
let _ = (process, graceful_timeout, kill_timeout);
Err("process termination is not supported on this platform".to_string())
}
}
async fn wait_until_dead(process: &TrackedProcess, timeout: Duration) -> bool {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if !is_process_alive(process) {
return true;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
!is_process_alive(process)
}
#[cfg(unix)]
fn pid_alive(pid: i32) -> bool {
let rc = unsafe { nix::libc::kill(pid, 0) };
if rc == 0 {
return true;
}
let err = std::io::Error::last_os_error();
match err.raw_os_error() {
Some(code) if code == nix::libc::ESRCH => false,
_ => true,
}
}
#[cfg(unix)]
fn process_group_alive(pgid: i32) -> bool {
let rc = unsafe { nix::libc::killpg(pgid, 0) };
if rc == 0 {
return true;
}
let err = std::io::Error::last_os_error();
match err.raw_os_error() {
Some(code) if code == nix::libc::ESRCH => false,
_ => true,
}
}
#[cfg(unix)]
fn signal_process_group(pgid: i32, signal: i32) {
let rc = unsafe { nix::libc::killpg(pgid, signal) };
if rc == -1 {
let err = std::io::Error::last_os_error();
if err.raw_os_error() != Some(nix::libc::ESRCH) {
debug!(target: "docdexd", "killpg({pgid},{signal}) failed: {err}");
}
}
}
#[cfg(unix)]
fn signal_pid(pid: i32, signal: i32) {
let rc = unsafe { nix::libc::kill(pid, signal) };
if rc == -1 {
let err = std::io::Error::last_os_error();
if err.raw_os_error() != Some(nix::libc::ESRCH) {
debug!(target: "docdexd", "kill({pid},{signal}) failed: {err}");
}
}
}
#[cfg(windows)]
async fn terminate_windows(pid: u32, force: bool) -> Result<(), String> {
use tokio::process::Command;
let mut cmd = Command::new("taskkill");
cmd.arg("/PID").arg(pid.to_string()).arg("/T");
if force {
cmd.arg("/F");
}
let status = cmd
.status()
.await
.map_err(|err| format!("taskkill failed: {err}"))?;
if status.success() {
Ok(())
} else {
Err(format!("taskkill exited with {status}"))
}
}
#[derive(Clone, Debug)]
pub enum ScraperEngine {
Playwright { config: PlaywrightFetchConfig },
}
impl ScraperEngine {
pub fn from_web_config(config: &WebConfig) -> Result<Self> {
let engine = config.scraper_engine.trim().to_ascii_lowercase();
if !engine.is_empty() && engine != "playwright" {
warn!(
"web scraper engine {} is not supported; using playwright",
config.scraper_engine
);
}
let playwright_config =
PlaywrightFetchConfig::from_web_config(config).ok_or_else(|| {
anyhow!("playwright fetch config unavailable; run `docdexd browser setup`")
})?;
Ok(ScraperEngine::Playwright {
config: playwright_config,
})
}
pub async fn fetch_dom(&self, url: &Url) -> Result<ChromeFetchResult> {
match self {
ScraperEngine::Playwright { config } => fetch_dom_playwright(url, config).await,
}
}
}
#[cfg(test)]
mod tests {
#[cfg(unix)]
use super::*;
#[cfg(unix)]
fn resolve_shell() -> Option<std::path::PathBuf> {
const CANDIDATES: [&str; 2] = ["/bin/sh", "/usr/bin/sh"];
for candidate in CANDIDATES {
let path = std::path::Path::new(candidate);
if path.exists() {
return Some(path.to_path_buf());
}
}
if let Some(shell) = std::env::var_os("SHELL") {
let path = std::path::PathBuf::from(shell);
if path.exists() {
return Some(path);
}
}
None
}
#[cfg(unix)]
fn shell_command() -> Option<tokio::process::Command> {
let shell = resolve_shell()?;
Some(tokio::process::Command::new(shell))
}
fn prometheus_counter(text: &str, name: &str) -> u64 {
for line in text.lines() {
if line.starts_with(name) {
if let Some(value) = line.split_whitespace().nth(1) {
if let Ok(parsed) = value.parse::<u64>() {
return parsed;
}
}
}
}
0
}
#[tokio::test]
#[cfg(unix)]
async fn reaps_orphaned_process_after_grace() {
use std::io;
use tempfile::TempDir;
let before = crate::metrics::global().render_prometheus();
let before_reaped = prometheus_counter(&before, "docdex_chrome_watchdog_reaped_total");
let watchdog = ChromeWatchdog::start(ChromeWatchdogConfig {
scan_interval: Duration::from_millis(50),
orphan_reap_after: Duration::from_millis(100),
graceful_shutdown_timeout: Duration::from_millis(100),
kill_timeout: Duration::from_millis(200),
max_session_age: None,
unresponsive_timeout: None,
});
let tracker = watchdog.tracker();
let temp = TempDir::new().expect("temp dir");
let pid_file = temp.path().join("orphan.pid");
let mut cmd = match shell_command() {
Some(cmd) => cmd,
None => {
eprintln!("skipping: no POSIX shell found");
return;
}
};
cmd.arg("-c")
.arg(r#"nohup sleep 1000 >/dev/null 2>&1 & echo $! > "$1""#)
.arg("sh")
.arg(pid_file.as_os_str());
unsafe {
cmd.pre_exec(|| {
let rc = nix::libc::setsid();
if rc == -1 {
return Err(io::Error::last_os_error());
}
Ok(())
});
}
let mut child = cmd.spawn().expect("spawn helper process");
let deadline = Instant::now() + Duration::from_secs(2);
let sleep_pid: u32 = loop {
if let Ok(text) = std::fs::read_to_string(&pid_file) {
if let Ok(pid) = text.trim().parse::<u32>() {
break pid;
}
}
if Instant::now() > deadline {
panic!("timed out waiting for orphan pid file");
}
tokio::time::sleep(Duration::from_millis(25)).await;
};
// Reap the helper so it doesn't become a zombie itself.
let _ = tokio::time::timeout(Duration::from_secs(2), child.wait())
.await
.expect("helper wait timeout");
let pgid = unsafe { nix::libc::getpgid(sleep_pid as i32) };
assert_ne!(pgid, -1, "expected getpgid to succeed");
let handle = tracker.register(
"session-orphan",
TrackedProcess {
pid: sleep_pid,
process_group_id: Some(pgid),
},
);
handle.end(); // Session ended, process still alive -> should be reaped.
drop(handle);
let deadline = Instant::now() + Duration::from_secs(3);
while Instant::now() < deadline && process_group_alive(pgid) {
tokio::time::sleep(Duration::from_millis(25)).await;
}
assert!(
!process_group_alive(pgid),
"expected orphaned process group to be reaped"
);
let metrics_deadline = Instant::now() + Duration::from_secs(2);
let after_reaped = loop {
let snapshot = crate::metrics::global().render_prometheus();
let value = prometheus_counter(&snapshot, "docdex_chrome_watchdog_reaped_total");
if value >= before_reaped + 1 || Instant::now() > metrics_deadline {
break value;
}
tokio::time::sleep(Duration::from_millis(25)).await;
};
assert!(
after_reaped >= before_reaped + 1,
"expected watchdog reaped counter to increment"
);
watchdog.shutdown().await;
}
#[tokio::test]
#[cfg(unix)]
async fn does_not_reap_active_session_without_opt_in_timeouts() {
use std::io;
use tempfile::TempDir;
let watchdog = ChromeWatchdog::start(ChromeWatchdogConfig {
scan_interval: Duration::from_millis(50),
orphan_reap_after: Duration::from_millis(100),
graceful_shutdown_timeout: Duration::from_millis(100),
kill_timeout: Duration::from_millis(200),
max_session_age: None,
unresponsive_timeout: Some(Duration::from_millis(100)), // Opt-in, but requires heartbeats.
});
let tracker = watchdog.tracker();
let temp = TempDir::new().expect("temp dir");
let pid_file = temp.path().join("active.pid");
let mut cmd = match shell_command() {
Some(cmd) => cmd,
None => {
eprintln!("skipping: no POSIX shell found");
return;
}
};
cmd.arg("-c")
.arg(r#"nohup sleep 1000 >/dev/null 2>&1 & echo $! > "$1""#)
.arg("sh")
.arg(pid_file.as_os_str());
unsafe {
cmd.pre_exec(|| {
let rc = nix::libc::setsid();
if rc == -1 {
return Err(io::Error::last_os_error());
}
Ok(())
});
}
let mut child = cmd.spawn().expect("spawn helper process");
let deadline = Instant::now() + Duration::from_secs(2);
let sleep_pid: u32 = loop {
if let Ok(text) = std::fs::read_to_string(&pid_file) {
if let Ok(pid) = text.trim().parse::<u32>() {
break pid;
}
}
if Instant::now() > deadline {
panic!("timed out waiting for orphan pid file");
}
tokio::time::sleep(Duration::from_millis(25)).await;
};
let _ = tokio::time::timeout(Duration::from_secs(2), child.wait())
.await
.expect("helper wait timeout");
let pgid = unsafe { nix::libc::getpgid(sleep_pid as i32) };
assert_ne!(pgid, -1, "expected getpgid to succeed");
let _handle = tracker.register(
"session-active",
TrackedProcess {
pid: sleep_pid,
process_group_id: Some(pgid),
},
);
tokio::time::sleep(Duration::from_millis(250)).await;
assert!(
process_group_alive(pgid),
"active session should not be reaped without opt-in heartbeats"
);
// End the session and ensure the watchdog does cleanup before shutdown.
tracker.end_session("session-active");
let deadline = Instant::now() + Duration::from_secs(3);
while Instant::now() < deadline && process_group_alive(pgid) {
tokio::time::sleep(Duration::from_millis(25)).await;
}
watchdog.shutdown().await;
}
}