"""
Graceful Termination Utilities
Provides utilities for gracefully terminating processes with timeout handling,
signal management, and comprehensive cleanup verification.
Architecture Integration:
- Design Patterns: Strategy pattern for termination methods, Template for workflow
- Performance Profile: O(1) signal sending with configurable timeout handling
- Error Handling: Robust fallback to force termination when needed
Technical Decisions:
- Signal Progression: SIGTERM → wait → SIGKILL for reliability
- Process Monitoring: Active polling with exponential backoff
- Platform Support: Cross-platform with OS-specific optimizations
Dependencies & Integration:
- External: psutil for cross-platform process management
- Internal: Audit logging, error handling utilities
Quality Assurance:
- Test Coverage: Unit tests for all termination scenarios
- Timeout Testing: Verification of timeout handling
- Platform Testing: Cross-platform compatibility
Author: ADDER_4 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import asyncio
import os
import signal
import time
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional
# Import boundaries
from src.boundaries.audit import AuditCategory, AuditLevel, get_audit_logger
# Import utilities
from src.utils.errors import OperationError
from .contracts_shim import ensure, require
class TerminationMethod(Enum):
"""Methods of process termination."""
GRACEFUL = "graceful"
FORCED = "forced"
TIMEOUT = "timeout"
FAILED = "failed"
@dataclass
class TerminationResult:
"""Result of a termination attempt."""
success: bool
method: TerminationMethod
process_id: int
duration_seconds: float
exit_code: Optional[int] = None
signals_sent: List[str] = None
error: Optional[str] = None
def __post_init__(self):
if self.signals_sent is None:
self.signals_sent = []
class GracefulTerminator:
"""
Handles graceful process termination with timeout management.
Provides a reliable process termination workflow with graceful
shutdown attempts followed by forced termination if needed.
Contracts:
Preconditions:
- Process ID must be valid
- Timeout must be positive
Postconditions:
- Process terminated or error raised
- Termination result returned with details
- Audit trail created
Invariants:
- Termination attempts are logged
- Timeout is respected
- Force termination used as last resort
"""
def __init__(self, timeout_seconds: int = 30):
"""
Initialize graceful terminator.
Args:
timeout_seconds: Maximum time to wait for graceful termination
"""
self.timeout_seconds = timeout_seconds
self._audit_logger = None
self._termination_stats = {
"graceful_success": 0,
"forced_success": 0,
"timeouts": 0,
"failures": 0,
}
async def initialize(self) -> None:
"""Initialize terminator with audit logging."""
self._audit_logger = get_audit_logger()
@require(lambda self, process_id: process_id > 0)
@ensure(lambda result: isinstance(result, TerminationResult))
async def terminate_claude_process(
self, process_id: int, command_text: Optional[str] = None
) -> TerminationResult:
"""
Terminate Claude Code process gracefully.
Attempts graceful termination first, then falls back to
forced termination if needed.
Args:
process_id: Process ID to terminate
command_text: Optional termination command to send first
Returns:
TerminationResult with termination details
"""
start_time = time.time()
signals_sent = []
try:
# Check if process exists
if not await self._is_process_running(process_id):
return TerminationResult(
success=True,
method=TerminationMethod.GRACEFUL,
process_id=process_id,
duration_seconds=0.0,
exit_code=0,
signals_sent=[],
)
# Try graceful termination with SIGTERM
try:
os.kill(process_id, signal.SIGTERM)
signals_sent.append("SIGTERM")
# Wait for process to terminate
exit_code = await self._wait_for_termination(
process_id, timeout=self.timeout_seconds
)
if exit_code is not None:
# Graceful termination succeeded
duration = time.time() - start_time
self._termination_stats["graceful_success"] += 1
result = TerminationResult(
success=True,
method=TerminationMethod.GRACEFUL,
process_id=process_id,
duration_seconds=duration,
exit_code=exit_code,
signals_sent=signals_sent,
)
await self._log_termination(result)
return result
except ProcessLookupError:
# Process already terminated
return TerminationResult(
success=True,
method=TerminationMethod.GRACEFUL,
process_id=process_id,
duration_seconds=time.time() - start_time,
exit_code=0,
signals_sent=signals_sent,
)
# Graceful termination timed out, try forced termination
self._termination_stats["timeouts"] += 1
try:
os.kill(process_id, signal.SIGKILL)
signals_sent.append("SIGKILL")
# Wait briefly for forced termination
exit_code = await self._wait_for_termination(
process_id, timeout=5 # Short timeout for SIGKILL
)
duration = time.time() - start_time
if exit_code is not None or not await self._is_process_running(
process_id
):
# Forced termination succeeded
self._termination_stats["forced_success"] += 1
result = TerminationResult(
success=True,
method=TerminationMethod.FORCED,
process_id=process_id,
duration_seconds=duration,
exit_code=exit_code or -9,
signals_sent=signals_sent,
)
await self._log_termination(result)
return result
except ProcessLookupError:
# Process terminated after SIGKILL
return TerminationResult(
success=True,
method=TerminationMethod.FORCED,
process_id=process_id,
duration_seconds=time.time() - start_time,
exit_code=-9,
signals_sent=signals_sent,
)
# All termination attempts failed
self._termination_stats["failures"] += 1
result = TerminationResult(
success=False,
method=TerminationMethod.FAILED,
process_id=process_id,
duration_seconds=time.time() - start_time,
signals_sent=signals_sent,
error="Process refused to terminate",
)
await self._log_termination(result)
return result
except Exception as e:
# Unexpected error during termination
self._termination_stats["failures"] += 1
result = TerminationResult(
success=False,
method=TerminationMethod.FAILED,
process_id=process_id,
duration_seconds=time.time() - start_time,
signals_sent=signals_sent,
error=str(e),
)
await self._log_termination(result)
return result
async def terminate_process_group(
self, process_ids: List[int], parallel: bool = True
) -> Dict[int, TerminationResult]:
"""
Terminate multiple processes.
Args:
process_ids: List of process IDs to terminate
parallel: Whether to terminate in parallel
Returns:
Dict mapping process ID to termination result
"""
results = {}
if parallel:
# Terminate all processes in parallel
tasks = [self.terminate_claude_process(pid) for pid in process_ids]
termination_results = await asyncio.gather(*tasks, return_exceptions=True)
for pid, result in zip(process_ids, termination_results):
if isinstance(result, Exception):
results[pid] = TerminationResult(
success=False,
method=TerminationMethod.FAILED,
process_id=pid,
duration_seconds=0.0,
error=str(result),
)
else:
results[pid] = result
else:
# Terminate processes sequentially
for pid in process_ids:
results[pid] = await self.terminate_claude_process(pid)
return results
async def _is_process_running(self, process_id: int) -> bool:
"""
Check if process is running.
Args:
process_id: Process ID to check
Returns:
bool: True if process is running
"""
try:
# Try using psutil if available
import psutil
return psutil.pid_exists(process_id)
except ImportError:
# Fallback to OS check
try:
os.kill(process_id, 0)
return True
except ProcessLookupError:
return False
except PermissionError:
# Process exists but we can't access it
return True
async def _wait_for_termination(
self, process_id: int, timeout: float
) -> Optional[int]:
"""
Wait for process to terminate.
Args:
process_id: Process ID to monitor
timeout: Maximum time to wait
Returns:
Optional[int]: Exit code if terminated, None if timeout
"""
start_time = time.time()
check_interval = 0.1 # Start with 100ms checks
while (time.time() - start_time) < timeout:
if not await self._is_process_running(process_id):
# Process terminated
try:
# Try to get exit code
import psutil
process = psutil.Process(process_id)
return process.wait(0)
except:
return 0 # Default exit code
# Exponential backoff for check interval
await asyncio.sleep(check_interval)
check_interval = min(check_interval * 1.5, 1.0) # Cap at 1 second
return None # Timeout
async def _log_termination(self, result: TerminationResult) -> None:
"""
Log termination result.
Args:
result: Termination result to log
"""
if not self._audit_logger:
return
level = AuditLevel.INFO if result.success else AuditLevel.WARNING
await self._audit_logger.log_event(
level=level,
category=AuditCategory.PROCESS_MANAGEMENT,
operation="process_termination",
resource_type="process",
resource_id=str(result.process_id),
success=result.success,
metadata={
"method": result.method.value,
"duration_seconds": result.duration_seconds,
"exit_code": result.exit_code,
"signals_sent": result.signals_sent,
"error": result.error,
},
)
def get_termination_stats(self) -> Dict[str, Any]:
"""Get termination statistics."""
total = sum(self._termination_stats.values())
return {
**self._termination_stats,
"total_terminations": total,
"graceful_rate": (
self._termination_stats["graceful_success"] / total * 100
if total > 0
else 0.0
),
"force_required_rate": (
self._termination_stats["forced_success"] / total * 100
if total > 0
else 0.0
),
"failure_rate": (
self._termination_stats["failures"] / total * 100 if total > 0 else 0.0
),
}
# Export utilities
__all__ = ["GracefulTerminator", "TerminationMethod", "TerminationResult"]