"""
Status and Health Monitoring Types - Agent Orchestration Platform
Architecture Integration:
- Design Patterns: Observer pattern with real-time health monitoring
- Security Model: Secure health reporting with access control
- Performance Profile: Efficient health checks with minimal overhead
Technical Decisions:
- Health Metrics: Comprehensive system and component health tracking
- Status Reporting: Real-time status updates with historical tracking
- Alert System: Multi-level alerting with escalation and recovery
- Diagnostic Data: Rich diagnostic information for troubleshooting
Dependencies & Integration:
- External: None (stdlib only for maximum portability)
- Internal: Foundation for all monitoring and diagnostics
Quality Assurance:
- Test Coverage: Property-based testing for all monitoring operations
- Error Handling: Comprehensive health validation with diagnostics
Author: Adder_3 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Dict, List, Optional, Set, Union
from .agent import AgentStatus
from .ids import AgentId, ProcessId, SessionId, TabId
from .resources import AlertLevel, ResourceAlert, ResourceUsage
from .session import SessionStatus
# ============================================================================
# HEALTH ENUMERATION - Health States and Severity Levels
# ============================================================================
class HealthStatus(Enum):
"""Overall health status levels."""
HEALTHY = "healthy"
WARNING = "warning"
CRITICAL = "critical"
EMERGENCY = "emergency"
UNKNOWN = "unknown"
class ComponentType(Enum):
"""Types of system components being monitored."""
SYSTEM = "system"
SERVER = "server"
SESSION = "session"
AGENT = "agent"
PROCESS = "process"
ITERM_TAB = "iterm_tab"
NETWORK = "network"
STORAGE = "storage"
class DiagnosticLevel(Enum):
"""Diagnostic information detail levels."""
BASIC = "basic"
DETAILED = "detailed"
COMPREHENSIVE = "comprehensive"
DEBUG = "debug"
# ============================================================================
# MONITORING EXCEPTIONS - Typed Error Handling
# ============================================================================
class MonitoringError(Exception):
"""Base exception for monitoring-related errors."""
def __init__(
self,
message: str,
component_type: ComponentType = None,
error_code: str = "MONITORING_ERROR",
):
self.component_type = component_type
self.error_code = error_code
super().__init__(
f"[{error_code}] {component_type.value if component_type else 'monitoring'}: {message}"
)
class HealthCheckError(MonitoringError):
"""Exception for health check failures."""
def __init__(self, message: str, component_type: ComponentType = None):
super().__init__(message, component_type, "HEALTH_CHECK_ERROR")
class DiagnosticError(MonitoringError):
"""Exception for diagnostic collection failures."""
def __init__(self, message: str, component_type: ComponentType = None):
super().__init__(message, component_type, "DIAGNOSTIC_ERROR")
# ============================================================================
# COMPONENT HEALTH - Individual Component Health Tracking
# ============================================================================
@dataclass(frozen=True)
class ComponentHealth:
"""
Immutable health status for individual system components.
Architecture:
- Pattern: Value Object with comprehensive health information
- Security: Secure health reporting without sensitive data exposure
- Performance: Efficient health calculation with caching
- Integration: Foundation for system-wide health monitoring
Contracts:
Preconditions:
- Component identification is valid and unique
- Health metrics are current and accurate
- Status transitions follow defined health model
Postconditions:
- Health status accurately reflects component state
- Diagnostic information is complete and actionable
- Health history maintains consistency and ordering
Invariants:
- Health status consistency with underlying metrics
- Timestamp accuracy and monotonic ordering
- Diagnostic data completeness and validation
Security Implementation:
- Data Sanitization: Health data sanitized for transmission
- Access Control: Health information access controlled by context
- Alert Security: Alert information does not expose sensitive data
- Audit Trail: Health changes logged for security monitoring
"""
# Component identification
component_id: str
component_type: ComponentType
component_name: str
# Health status
health_status: HealthStatus
last_check_time: datetime = field(default_factory=datetime.now)
next_check_time: Optional[datetime] = None
check_interval_seconds: int = 30
# Health metrics
uptime_seconds: int = 0
error_count: int = 0
warning_count: int = 0
last_error_time: Optional[datetime] = None
last_warning_time: Optional[datetime] = None
# Performance indicators
response_time_ms: Optional[float] = None
success_rate_percent: Optional[float] = None
availability_percent: Optional[float] = None
# Resource health (if applicable)
resource_usage: Optional[ResourceUsage] = None
resource_alerts: List[ResourceAlert] = field(default_factory=list)
# Diagnostic information
diagnostic_data: Dict[str, Any] = field(default_factory=dict)
status_message: str = ""
def __post_init__(self):
"""Validate component health structure."""
# Component identification validation
if not self.component_id or not self.component_id.strip():
raise ValueError("Component ID cannot be empty")
if not self.component_name or not self.component_name.strip():
raise ValueError("Component name cannot be empty")
# Check interval validation
if self.check_interval_seconds <= 0 or self.check_interval_seconds > 3600:
raise ValueError(
f"Check interval {self.check_interval_seconds} must be in range (0, 3600]"
)
# Uptime validation
if self.uptime_seconds < 0:
raise ValueError("Uptime cannot be negative")
# Count validation
if self.error_count < 0 or self.warning_count < 0:
raise ValueError("Error and warning counts cannot be negative")
# Performance metrics validation
if self.response_time_ms is not None and self.response_time_ms < 0:
raise ValueError("Response time cannot be negative")
if self.success_rate_percent is not None and not (
0 <= self.success_rate_percent <= 100
):
raise ValueError("Success rate must be in range [0, 100]")
if self.availability_percent is not None and not (
0 <= self.availability_percent <= 100
):
raise ValueError("Availability must be in range [0, 100]")
# Status message validation
if len(self.status_message) > 500:
raise ValueError("Status message too long")
def is_healthy(self) -> bool:
"""Check if component is in healthy state."""
return self.health_status == HealthStatus.HEALTHY
def needs_attention(self) -> bool:
"""Check if component needs immediate attention."""
return self.health_status in [HealthStatus.CRITICAL, HealthStatus.EMERGENCY]
def is_overdue_for_check(self) -> bool:
"""Check if component is overdue for health check."""
if self.next_check_time is None:
return True
return datetime.now() > self.next_check_time
def get_time_since_last_check(self) -> timedelta:
"""Get time elapsed since last health check."""
return datetime.now() - self.last_check_time
def get_error_rate(self, time_window_hours: int = 24) -> float:
"""
Calculate error rate over specified time window.
Args:
time_window_hours: Time window for rate calculation
Returns:
float: Error rate as errors per hour
"""
if time_window_hours <= 0:
return 0.0
uptime_hours = self.uptime_seconds / 3600
if uptime_hours <= 0:
return 0.0
window_hours = min(time_window_hours, uptime_hours)
return self.error_count / window_hours
def get_health_score(self) -> float:
"""
Calculate overall health score (0-100).
Returns:
float: Health score from 0 (worst) to 100 (best)
"""
# Base score from health status
status_scores = {
HealthStatus.HEALTHY: 100,
HealthStatus.WARNING: 75,
HealthStatus.CRITICAL: 25,
HealthStatus.EMERGENCY: 0,
HealthStatus.UNKNOWN: 50,
}
base_score = status_scores[self.health_status]
# Adjust based on performance metrics
if self.success_rate_percent is not None:
base_score = (base_score + self.success_rate_percent) / 2
if self.availability_percent is not None:
base_score = (base_score + self.availability_percent) / 2
# Penalize for recent errors
if self.last_error_time:
time_since_error = datetime.now() - self.last_error_time
if time_since_error < timedelta(minutes=5):
base_score *= 0.8 # 20% penalty for recent errors
return max(0, min(100, base_score))
def get_critical_alerts(self) -> List[ResourceAlert]:
"""Get critical and emergency resource alerts."""
return [
alert for alert in self.resource_alerts if alert.is_critical_or_emergency()
]
def with_health_update(
self, new_status: HealthStatus, status_message: str = ""
) -> "ComponentHealth":
"""
Create new component health with updated status.
Args:
new_status: New health status
status_message: Optional status message
Returns:
ComponentHealth: New health with updated status
"""
next_check = datetime.now() + timedelta(seconds=self.check_interval_seconds)
return ComponentHealth(
component_id=self.component_id,
component_type=self.component_type,
component_name=self.component_name,
health_status=new_status,
last_check_time=datetime.now(),
next_check_time=next_check,
check_interval_seconds=self.check_interval_seconds,
uptime_seconds=self.uptime_seconds,
error_count=self.error_count,
warning_count=self.warning_count,
last_error_time=self.last_error_time,
last_warning_time=self.last_warning_time,
response_time_ms=self.response_time_ms,
success_rate_percent=self.success_rate_percent,
availability_percent=self.availability_percent,
resource_usage=self.resource_usage,
resource_alerts=self.resource_alerts,
diagnostic_data=self.diagnostic_data,
status_message=status_message,
)
# ============================================================================
# SYSTEM HEALTH - Aggregate System Health Monitoring
# ============================================================================
@dataclass(frozen=True)
class SystemHealth:
"""
Immutable aggregate system health across all components.
Contracts:
Invariants:
- Overall health reflects worst component health
- Component health data is current and validated
- Alert counts are consistent with component alerts
"""
# Overall system status
overall_health: HealthStatus
health_score: float # 0-100
last_update_time: datetime = field(default_factory=datetime.now)
# Component health summary
total_components: int = 0
healthy_components: int = 0
warning_components: int = 0
critical_components: int = 0
emergency_components: int = 0
unknown_components: int = 0
# Component health details
component_health: Dict[str, ComponentHealth] = field(default_factory=dict)
# Alert summary
total_alerts: int = 0
critical_alerts: int = 0
warning_alerts: int = 0
info_alerts: int = 0
# Performance summary
average_response_time_ms: Optional[float] = None
average_success_rate_percent: Optional[float] = None
average_availability_percent: Optional[float] = None
def __post_init__(self):
"""Validate system health consistency."""
# Component count validation
component_counts = [
self.healthy_components,
self.warning_components,
self.critical_components,
self.emergency_components,
self.unknown_components,
]
if any(count < 0 for count in component_counts):
raise ValueError("Component counts cannot be negative")
if sum(component_counts) != self.total_components:
raise ValueError("Component count breakdown must sum to total")
# Health score validation
if not (0 <= self.health_score <= 100):
raise ValueError("Health score must be in range [0, 100]")
# Alert count validation
if any(
count < 0
for count in [
self.total_alerts,
self.critical_alerts,
self.warning_alerts,
self.info_alerts,
]
):
raise ValueError("Alert counts cannot be negative")
if (
self.critical_alerts + self.warning_alerts + self.info_alerts
> self.total_alerts
):
raise ValueError("Alert breakdown cannot exceed total")
# Performance metrics validation
if (
self.average_response_time_ms is not None
and self.average_response_time_ms < 0
):
raise ValueError("Average response time cannot be negative")
if self.average_success_rate_percent is not None and not (
0 <= self.average_success_rate_percent <= 100
):
raise ValueError("Average success rate must be in range [0, 100]")
if self.average_availability_percent is not None and not (
0 <= self.average_availability_percent <= 100
):
raise ValueError("Average availability must be in range [0, 100]")
def is_system_healthy(self) -> bool:
"""Check if overall system is healthy."""
return self.overall_health == HealthStatus.HEALTHY
def requires_immediate_attention(self) -> bool:
"""Check if system requires immediate attention."""
return (
self.overall_health in [HealthStatus.CRITICAL, HealthStatus.EMERGENCY]
or self.critical_alerts > 0
or self.emergency_components > 0
)
def get_component_health_distribution(self) -> Dict[str, float]:
"""
Get component health distribution as percentages.
Returns:
Dict[str, float]: Health distribution percentages
"""
if self.total_components == 0:
return {status.value: 0.0 for status in HealthStatus}
return {
"healthy": (self.healthy_components / self.total_components) * 100,
"warning": (self.warning_components / self.total_components) * 100,
"critical": (self.critical_components / self.total_components) * 100,
"emergency": (self.emergency_components / self.total_components) * 100,
"unknown": (self.unknown_components / self.total_components) * 100,
}
def get_worst_components(self, limit: int = 5) -> List[ComponentHealth]:
"""
Get components with worst health scores.
Args:
limit: Maximum number of components to return
Returns:
List[ComponentHealth]: Worst performing components
"""
components = list(self.component_health.values())
components.sort(key=lambda c: c.get_health_score())
return components[:limit]
def get_components_needing_attention(self) -> List[ComponentHealth]:
"""Get components that need immediate attention."""
return [
component
for component in self.component_health.values()
if component.needs_attention()
]
def get_overdue_health_checks(self) -> List[ComponentHealth]:
"""Get components overdue for health checks."""
return [
component
for component in self.component_health.values()
if component.is_overdue_for_check()
]
# ============================================================================
# DIAGNOSTIC INFORMATION - Rich Diagnostic Data Collection
# ============================================================================
@dataclass(frozen=True)
class DiagnosticInfo:
"""
Immutable diagnostic information for troubleshooting and analysis.
Contracts:
Invariants:
- Diagnostic data is current and comprehensive
- Performance metrics are accurate and validated
- Error information provides actionable insights
"""
# Diagnostic metadata
component_id: str
component_type: ComponentType
diagnostic_level: DiagnosticLevel
collection_time: datetime = field(default_factory=datetime.now)
# System information
system_info: Dict[str, Any] = field(default_factory=dict)
process_info: Dict[str, Any] = field(default_factory=dict)
network_info: Dict[str, Any] = field(default_factory=dict)
# Performance data
performance_metrics: Dict[str, float] = field(default_factory=dict)
resource_utilization: Dict[str, float] = field(default_factory=dict)
# Error and log information
recent_errors: List[str] = field(default_factory=list)
recent_warnings: List[str] = field(default_factory=list)
log_excerpts: List[str] = field(default_factory=list)
# Configuration and state
configuration_snapshot: Dict[str, Any] = field(default_factory=dict)
state_information: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
"""Validate diagnostic information structure."""
if not self.component_id or not self.component_id.strip():
raise ValueError("Component ID cannot be empty")
# Validate data sizes
if len(self.recent_errors) > 100:
raise ValueError("Too many recent errors")
if len(self.recent_warnings) > 100:
raise ValueError("Too many recent warnings")
if len(self.log_excerpts) > 50:
raise ValueError("Too many log excerpts")
# Validate data content
max_string_length = 1000
for error in self.recent_errors:
if len(error) > max_string_length:
raise ValueError("Error message too long")
for warning in self.recent_warnings:
if len(warning) > max_string_length:
raise ValueError("Warning message too long")
for excerpt in self.log_excerpts:
if len(excerpt) > max_string_length:
raise ValueError("Log excerpt too long")
def get_diagnostic_summary(self) -> Dict[str, Any]:
"""
Get condensed diagnostic summary.
Returns:
Dict[str, Any]: Diagnostic summary
"""
return {
"component_id": self.component_id,
"component_type": self.component_type.value,
"diagnostic_level": self.diagnostic_level.value,
"collection_time": self.collection_time.isoformat(),
"error_count": len(self.recent_errors),
"warning_count": len(self.recent_warnings),
"has_performance_data": bool(self.performance_metrics),
"has_resource_data": bool(self.resource_utilization),
"has_system_info": bool(self.system_info),
"configuration_keys": list(self.configuration_snapshot.keys()),
"state_keys": list(self.state_information.keys()),
}
def has_errors(self) -> bool:
"""Check if diagnostic info contains error information."""
return len(self.recent_errors) > 0
def has_warnings(self) -> bool:
"""Check if diagnostic info contains warning information."""
return len(self.recent_warnings) > 0
def get_latest_error(self) -> Optional[str]:
"""Get most recent error if available."""
return self.recent_errors[-1] if self.recent_errors else None
def get_latest_warning(self) -> Optional[str]:
"""Get most recent warning if available."""
return self.recent_warnings[-1] if self.recent_warnings else None
# ============================================================================
# HEALTH MONITORING FACTORY FUNCTIONS - Health Object Creation
# ============================================================================
def create_agent_health(
agent_id: AgentId, agent_name: str, agent_status: AgentStatus
) -> ComponentHealth:
"""
Create component health for agent monitoring.
Args:
agent_id: Agent identifier
agent_name: Agent display name
agent_status: Current agent status
Returns:
ComponentHealth: Agent health component
"""
# Map agent status to health status
status_mapping = {
AgentStatus.CREATED: HealthStatus.WARNING,
AgentStatus.STARTING: HealthStatus.WARNING,
AgentStatus.ACTIVE: HealthStatus.HEALTHY,
AgentStatus.IDLE: HealthStatus.HEALTHY,
AgentStatus.ERROR: HealthStatus.CRITICAL,
AgentStatus.RESTARTING: HealthStatus.WARNING,
AgentStatus.TERMINATING: HealthStatus.WARNING,
AgentStatus.TERMINATED: HealthStatus.CRITICAL,
}
health_status = status_mapping.get(agent_status, HealthStatus.UNKNOWN)
status_message = f"Agent status: {agent_status.value}"
return ComponentHealth(
component_id=str(agent_id),
component_type=ComponentType.AGENT,
component_name=agent_name,
health_status=health_status,
status_message=status_message,
diagnostic_data={"agent_status": agent_status.value},
)
def create_session_health(
session_id: SessionId, session_name: str, session_status: SessionStatus
) -> ComponentHealth:
"""
Create component health for session monitoring.
Args:
session_id: Session identifier
session_name: Session display name
session_status: Current session status
Returns:
ComponentHealth: Session health component
"""
# Map session status to health status
status_mapping = {
SessionStatus.CREATED: HealthStatus.WARNING,
SessionStatus.INITIALIZING: HealthStatus.WARNING,
SessionStatus.ACTIVE: HealthStatus.HEALTHY,
SessionStatus.IDLE: HealthStatus.HEALTHY,
SessionStatus.ERROR: HealthStatus.CRITICAL,
SessionStatus.RECOVERING: HealthStatus.WARNING,
SessionStatus.TERMINATING: HealthStatus.WARNING,
SessionStatus.TERMINATED: HealthStatus.CRITICAL,
}
health_status = status_mapping.get(session_status, HealthStatus.UNKNOWN)
status_message = f"Session status: {session_status.value}"
return ComponentHealth(
component_id=str(session_id),
component_type=ComponentType.SESSION,
component_name=session_name,
health_status=health_status,
status_message=status_message,
diagnostic_data={"session_status": session_status.value},
)
def create_process_health(
process_id: ProcessId, process_name: str, is_running: bool
) -> ComponentHealth:
"""
Create component health for process monitoring.
Args:
process_id: Process identifier
process_name: Process display name
is_running: Whether process is currently running
Returns:
ComponentHealth: Process health component
"""
health_status = HealthStatus.HEALTHY if is_running else HealthStatus.CRITICAL
status_message = f"Process {'running' if is_running else 'not running'}"
return ComponentHealth(
component_id=str(process_id),
component_type=ComponentType.PROCESS,
component_name=process_name,
health_status=health_status,
status_message=status_message,
diagnostic_data={"is_running": is_running},
)
def create_system_health_from_components(
components: List[ComponentHealth],
) -> SystemHealth:
"""
Create system health from individual component health.
Args:
components: List of component health objects
Returns:
SystemHealth: Aggregate system health
"""
if not components:
return SystemHealth(overall_health=HealthStatus.UNKNOWN, health_score=0.0)
# Count components by health status
status_counts = {status: 0 for status in HealthStatus}
for component in components:
status_counts[component.health_status] += 1
# Determine overall health (worst component health)
if status_counts[HealthStatus.EMERGENCY] > 0:
overall_health = HealthStatus.EMERGENCY
elif status_counts[HealthStatus.CRITICAL] > 0:
overall_health = HealthStatus.CRITICAL
elif status_counts[HealthStatus.WARNING] > 0:
overall_health = HealthStatus.WARNING
elif status_counts[HealthStatus.UNKNOWN] > 0:
overall_health = HealthStatus.UNKNOWN
else:
overall_health = HealthStatus.HEALTHY
# Calculate average health score
health_scores = [component.get_health_score() for component in components]
average_health_score = sum(health_scores) / len(health_scores)
# Count alerts
all_alerts = []
for component in components:
all_alerts.extend(component.resource_alerts)
alert_counts = {level: 0 for level in AlertLevel}
for alert in all_alerts:
alert_counts[alert.alert_level] += 1
# Build component health dictionary
component_health_dict = {
component.component_id: component for component in components
}
return SystemHealth(
overall_health=overall_health,
health_score=average_health_score,
total_components=len(components),
healthy_components=status_counts[HealthStatus.HEALTHY],
warning_components=status_counts[HealthStatus.WARNING],
critical_components=status_counts[HealthStatus.CRITICAL],
emergency_components=status_counts[HealthStatus.EMERGENCY],
unknown_components=status_counts[HealthStatus.UNKNOWN],
component_health=component_health_dict,
total_alerts=len(all_alerts),
critical_alerts=alert_counts[AlertLevel.CRITICAL]
+ alert_counts[AlertLevel.EMERGENCY],
warning_alerts=alert_counts[AlertLevel.WARNING],
info_alerts=alert_counts[AlertLevel.INFO],
)
def create_basic_diagnostic_info(
component_id: str, component_type: ComponentType
) -> DiagnosticInfo:
"""
Create basic diagnostic information for a component.
Args:
component_id: Component identifier
component_type: Type of component
Returns:
DiagnosticInfo: Basic diagnostic information
"""
return DiagnosticInfo(
component_id=component_id,
component_type=component_type,
diagnostic_level=DiagnosticLevel.BASIC,
system_info={"platform": "macos", "python_version": "3.9+"},
state_information={"diagnostic_level": "basic"},
)
# ============================================================================
# PERFORMANCE METRICS - System Performance Monitoring
# ============================================================================
@dataclass(frozen=True)
class PerformanceMetrics:
"""
Immutable performance metrics for system monitoring.
Contracts:
Invariants:
- All timing metrics are non-negative
- Percentage metrics are in valid range [0, 100]
- Throughput metrics are non-negative
"""
# Response time metrics
average_response_time_ms: float = 0.0
p95_response_time_ms: float = 0.0
p99_response_time_ms: float = 0.0
max_response_time_ms: float = 0.0
# Throughput metrics
requests_per_second: float = 0.0
operations_per_minute: float = 0.0
concurrent_operations: int = 0
# Success metrics
success_rate_percent: float = 100.0
error_rate_percent: float = 0.0
timeout_rate_percent: float = 0.0
# Resource efficiency
cpu_efficiency_percent: float = 0.0
memory_efficiency_percent: float = 0.0
network_efficiency_percent: float = 0.0
# Timestamp
measurement_time: datetime = field(default_factory=datetime.now)
measurement_window_seconds: int = 60
def __post_init__(self):
"""Validate performance metrics."""
# Validate timing metrics
timing_metrics = [
self.average_response_time_ms,
self.p95_response_time_ms,
self.p99_response_time_ms,
self.max_response_time_ms,
]
if any(metric < 0 for metric in timing_metrics):
raise ValueError("Timing metrics cannot be negative")
# Validate throughput metrics
if (
self.requests_per_second < 0
or self.operations_per_minute < 0
or self.concurrent_operations < 0
):
raise ValueError("Throughput metrics cannot be negative")
# Validate percentage metrics
percentage_metrics = [
self.success_rate_percent,
self.error_rate_percent,
self.timeout_rate_percent,
self.cpu_efficiency_percent,
self.memory_efficiency_percent,
self.network_efficiency_percent,
]
for metric in percentage_metrics:
if not (0 <= metric <= 100):
raise ValueError(
f"Percentage metric {metric} must be in range [0, 100]"
)
# Validate measurement window
if self.measurement_window_seconds <= 0:
raise ValueError("Measurement window must be positive")
def get_overall_efficiency(self) -> float:
"""Calculate overall system efficiency score."""
efficiency_scores = [
self.cpu_efficiency_percent,
self.memory_efficiency_percent,
self.network_efficiency_percent,
]
return sum(efficiency_scores) / len(efficiency_scores)
def is_performance_degraded(self) -> bool:
"""Check if performance is degraded."""
return (
self.success_rate_percent < 95.0
or self.error_rate_percent > 5.0
or self.average_response_time_ms > 5000.0
or self.get_overall_efficiency() < 70.0
)
@dataclass(frozen=True)
class ResourceAllocation:
"""
Immutable resource allocation configuration and status.
Contracts:
Invariants:
- Resource limits are positive and within system bounds
- Allocated resources do not exceed limits
- Resource percentages are valid
"""
# Memory allocation
memory_limit_mb: int = 1024
memory_allocated_mb: int = 0
memory_usage_percent: float = 0.0
# CPU allocation
cpu_limit_percent: float = 50.0
cpu_allocated_percent: float = 0.0
cpu_usage_percent: float = 0.0
# Disk allocation
disk_limit_mb: int = 10240
disk_allocated_mb: int = 0
disk_usage_percent: float = 0.0
# Network allocation
network_limit_mbps: float = 100.0
network_allocated_mbps: float = 0.0
network_usage_percent: float = 0.0
# Process limits
max_processes: int = 10
active_processes: int = 0
max_threads_per_process: int = 20
# Allocation metadata
allocation_time: datetime = field(default_factory=datetime.now)
allocation_policy: str = "balanced"
def __post_init__(self):
"""Validate resource allocation."""
# Validate limits are positive
if any(
limit <= 0
for limit in [
self.memory_limit_mb,
self.cpu_limit_percent,
self.disk_limit_mb,
self.network_limit_mbps,
self.max_processes,
self.max_threads_per_process,
]
):
raise ValueError("Resource limits must be positive")
# Validate allocated resources don't exceed limits
if self.memory_allocated_mb > self.memory_limit_mb:
raise ValueError("Memory allocation exceeds limit")
if self.cpu_allocated_percent > self.cpu_limit_percent:
raise ValueError("CPU allocation exceeds limit")
if self.disk_allocated_mb > self.disk_limit_mb:
raise ValueError("Disk allocation exceeds limit")
if self.network_allocated_mbps > self.network_limit_mbps:
raise ValueError("Network allocation exceeds limit")
if self.active_processes > self.max_processes:
raise ValueError("Active processes exceed limit")
# Validate percentages
percentage_metrics = [
self.memory_usage_percent,
self.cpu_usage_percent,
self.disk_usage_percent,
self.network_usage_percent,
]
for metric in percentage_metrics:
if not (0 <= metric <= 100):
raise ValueError(f"Usage percentage {metric} must be in range [0, 100]")
# Validate CPU limit
if not (0 < self.cpu_limit_percent <= 100):
raise ValueError("CPU limit must be in range (0, 100]")
def get_memory_available_mb(self) -> int:
"""Get available memory in MB."""
return self.memory_limit_mb - self.memory_allocated_mb
def get_cpu_available_percent(self) -> float:
"""Get available CPU percentage."""
return self.cpu_limit_percent - self.cpu_allocated_percent
def get_disk_available_mb(self) -> int:
"""Get available disk space in MB."""
return self.disk_limit_mb - self.disk_allocated_mb
def get_network_available_mbps(self) -> float:
"""Get available network bandwidth in Mbps."""
return self.network_limit_mbps - self.network_allocated_mbps
def is_resource_constrained(self) -> bool:
"""Check if any resource is heavily constrained."""
return (
self.memory_usage_percent > 90.0
or self.cpu_usage_percent > 90.0
or self.disk_usage_percent > 90.0
or self.network_usage_percent > 90.0
or self.active_processes >= self.max_processes
)
def can_allocate_memory(self, required_mb: int) -> bool:
"""Check if required memory can be allocated."""
return self.get_memory_available_mb() >= required_mb
def can_allocate_cpu(self, required_percent: float) -> bool:
"""Check if required CPU can be allocated."""
return self.get_cpu_available_percent() >= required_percent
def validate_health_consistency(health: ComponentHealth) -> None:
"""
Validate component health consistency and completeness.
Args:
health: Component health to validate
Raises:
HealthCheckError: If health data is inconsistent
"""
# Status consistency validation
if health.health_status == HealthStatus.EMERGENCY and health.error_count == 0:
raise HealthCheckError(
"Emergency health status requires error count > 0", health.component_type
)
if health.health_status == HealthStatus.HEALTHY and health.get_critical_alerts():
raise HealthCheckError(
"Healthy status inconsistent with critical alerts", health.component_type
)
# Time consistency validation
if health.last_error_time and health.last_error_time > datetime.now():
raise HealthCheckError(
"Last error time cannot be in the future", health.component_type
)
if health.last_warning_time and health.last_warning_time > datetime.now():
raise HealthCheckError(
"Last warning time cannot be in the future", health.component_type
)
# Resource alert validation
for alert in health.resource_alerts:
if alert.alert_level == AlertLevel.EMERGENCY and health.health_status not in [
HealthStatus.CRITICAL,
HealthStatus.EMERGENCY,
]:
raise HealthCheckError(
"Emergency alerts require critical or emergency health status",
health.component_type,
)