"""Daemon monitoring module for tracking Gradle daemons and sessions."""
import asyncio
import subprocess
import uuid
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from threading import Lock
from typing import Any
@dataclass
class DaemonInfo:
"""Information about a Gradle daemon."""
pid: str
status: str
gradle_version: str | None = None
last_activity: datetime | None = None
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"pid": self.pid,
"status": self.status,
"gradle_version": self.gradle_version,
"last_activity": self.last_activity.isoformat() if self.last_activity else None,
}
@dataclass
class DaemonSession:
"""Information about an active daemon session."""
session_id: str
daemon_pid: str
task: str
start_time: datetime
logs: deque = field(default_factory=lambda: deque(maxlen=1000))
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"session_id": self.session_id,
"daemon_pid": self.daemon_pid,
"task": self.task,
"start_time": self.start_time.isoformat(),
"log_count": len(self.logs),
}
class DaemonMonitor:
"""Monitor for Gradle daemons and sessions.
This class provides methods for tracking daemon status, managing sessions,
and collecting logs from Gradle operations.
"""
def __init__(self, project_root: str | Path | None = None) -> None:
"""Initialize the daemon monitor.
Args:
project_root: Root directory of the Gradle project.
Defaults to current directory.
"""
self.project_root = Path(project_root) if project_root else Path.cwd()
self._sessions: dict[str, DaemonSession] = {}
self._daemon_logs: dict[str, deque] = {} # pid -> logs
self._lock = Lock()
def _find_wrapper_script(self) -> Path:
"""Find the Gradle wrapper script.
Returns:
Path to the gradlew script.
Raises:
FileNotFoundError: If Gradle wrapper is not found.
"""
gradle_wrapper = self.project_root / "gradlew"
if not gradle_wrapper.exists():
raise FileNotFoundError(
f"Gradle wrapper not found at {gradle_wrapper}. "
"Please ensure gradlew script exists in the project root."
)
return gradle_wrapper
def get_daemons(self) -> list[DaemonInfo]:
"""Get list of running Gradle daemons.
Parses the output of `gradlew --status` to extract daemon information.
Returns:
List of DaemonInfo objects describing running daemons.
"""
try:
wrapper_script = self._find_wrapper_script()
result = subprocess.run(
[str(wrapper_script), "--status"],
cwd=str(self.project_root),
capture_output=True,
text=True,
timeout=30,
)
daemons: list[DaemonInfo] = []
output = result.stdout
if output:
for line in output.split("\n"):
line = line.strip()
# Skip header, empty lines, and info messages
if not line or line.startswith("PID") or line.startswith("-"):
continue
if line.startswith("No Gradle") or line.startswith("Only Daemons"):
continue
# Parse daemon status line
# Format: " 12345 IDLE 8.5"
parts = line.split()
if len(parts) >= 2 and parts[0].isdigit():
pid = parts[0]
status = parts[1].upper() if len(parts) > 1 else "UNKNOWN"
# Only include IDLE and BUSY daemons (active daemons)
# Skip STOPPED, STOPPING, or other non-active states
if status not in ("IDLE", "BUSY"):
continue
gradle_version = parts[2] if len(parts) > 2 else None
# Validate gradle_version looks like a version number
if gradle_version and not gradle_version[0].isdigit():
gradle_version = None
daemon = DaemonInfo(
pid=pid,
status=status,
gradle_version=gradle_version,
last_activity=datetime.now(),
)
daemons.append(daemon)
return daemons
except FileNotFoundError:
return []
except subprocess.TimeoutExpired:
return []
except Exception:
return []
def add_log(
self,
daemon_pid: str,
message: str,
level: str = "INFO",
) -> None:
"""Add a log entry for a daemon.
Args:
daemon_pid: PID of the daemon.
message: Log message.
level: Log level (DEBUG, INFO, WARN, ERROR).
"""
with self._lock:
if daemon_pid not in self._daemon_logs:
self._daemon_logs[daemon_pid] = deque(maxlen=1000)
log_entry = {
"timestamp": datetime.now().isoformat(),
"level": level,
"message": message,
}
self._daemon_logs[daemon_pid].append(log_entry)
def get_logs(
self,
daemon_pid: str,
limit: int = 100,
) -> list[dict]:
"""Get recent logs for a daemon.
Args:
daemon_pid: PID of the daemon.
limit: Maximum number of log entries to return.
Returns:
List of log entry dictionaries.
"""
with self._lock:
if daemon_pid not in self._daemon_logs:
return []
logs = list(self._daemon_logs[daemon_pid])
return logs[-limit:] if len(logs) > limit else logs
def start_session(
self,
daemon_pid: str,
task: str,
) -> str:
"""Start a new session for a daemon task.
Args:
daemon_pid: PID of the daemon running the task.
task: Name of the task being executed.
Returns:
Session ID for tracking.
"""
session_id = str(uuid.uuid4())[:8]
with self._lock:
session = DaemonSession(
session_id=session_id,
daemon_pid=daemon_pid,
task=task,
start_time=datetime.now(),
)
self._sessions[session_id] = session
self.add_log(daemon_pid, f"Started session {session_id} for task: {task}", "INFO")
return session_id
def end_session(self, session_id: str) -> None:
"""End a session.
Args:
session_id: ID of the session to end.
"""
with self._lock:
if session_id in self._sessions:
session = self._sessions[session_id]
self.add_log(
session.daemon_pid,
f"Ended session {session_id} for task: {session.task}",
"INFO",
)
del self._sessions[session_id]
def get_active_sessions(self) -> list[DaemonSession]:
"""Get all active sessions.
Returns:
List of active DaemonSession objects.
"""
with self._lock:
return list(self._sessions.values())
async def stop_daemon(self, pid: str | None = None) -> dict:
"""Stop a specific daemon or all daemons.
Args:
pid: PID of the daemon to stop, or None to stop all daemons.
Returns:
Dictionary with 'success' and optional 'error' keys.
"""
try:
wrapper_script = self._find_wrapper_script()
if pid:
# Stop specific daemon by killing the process
import signal
import os
try:
os.kill(int(pid), signal.SIGTERM)
return {"success": True, "error": None}
except ProcessLookupError:
return {"success": True, "error": None} # Already stopped
except PermissionError:
return {"success": False, "error": f"Permission denied to stop daemon {pid}"}
else:
# Stop all daemons using gradlew --stop
process = await asyncio.create_subprocess_exec(
str(wrapper_script),
"--stop",
cwd=str(self.project_root),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
return {"success": True, "error": None}
else:
error_msg = stderr.decode() if stderr else "Failed to stop daemons"
return {"success": False, "error": error_msg}
except FileNotFoundError as e:
return {"success": False, "error": str(e)}
except Exception as e:
return {"success": False, "error": str(e)}