"""
Resource Limits and Performance Tracking Types - Agent Orchestration Platform
Architecture Integration:
- Design Patterns: Resource Monitor pattern with real-time tracking
- Security Model: Resource boundaries with enforcement and alerts
- Performance Profile: O(1) resource checks with efficient monitoring
Technical Decisions:
- Resource Tracking: Comprehensive system resource monitoring
- Limit Enforcement: Proactive limits with graceful degradation
- Performance Metrics: Real-time collection with historical analysis
- Alert System: Threshold-based notifications with escalation
Dependencies & Integration:
- External: None (stdlib only for maximum portability)
- Internal: Foundation for agent and session resource management
Quality Assurance:
- Test Coverage: Property-based testing for all resource operations
- Error Handling: Graceful degradation with comprehensive logging
Author: Adder_3 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import psutil
# ============================================================================
# RESOURCE ENUMERATION - Resource Types and Priorities
# ============================================================================
class ResourceType(Enum):
"""Types of system resources being tracked and limited."""
CPU = "cpu"
MEMORY = "memory"
DISK_IO = "disk_io"
NETWORK_IO = "network_io"
FILE_DESCRIPTORS = "file_descriptors"
PROCESSES = "processes"
THREADS = "threads"
FILESYSTEM_SIZE = "filesystem_size"
class ResourcePriority(Enum):
"""Priority levels for resource allocation and enforcement."""
LOW = "low"
NORMAL = "normal"
HIGH = "high"
CRITICAL = "critical"
class AlertLevel(Enum):
"""Alert levels for resource threshold violations."""
INFO = "info" # 50-70% of limit
WARNING = "warning" # 70-85% of limit
CRITICAL = "critical" # 85-95% of limit
EMERGENCY = "emergency" # >95% of limit
# ============================================================================
# RESOURCE EXCEPTIONS - Typed Error Handling
# ============================================================================
class ResourceError(Exception):
"""Base exception for resource-related errors."""
def __init__(
self,
message: str,
resource_type: ResourceType,
error_code: str = "RESOURCE_ERROR",
):
self.resource_type = resource_type
self.error_code = error_code
super().__init__(f"[{error_code}] {resource_type.value}: {message}")
class ResourceLimitExceededError(ResourceError):
"""Exception for resource limit violations."""
def __init__(
self, resource_type: ResourceType, current_value: float, limit_value: float
):
self.current_value = current_value
self.limit_value = limit_value
message = f"Limit exceeded: {current_value} > {limit_value}"
super().__init__(message, resource_type, "LIMIT_EXCEEDED")
class ResourceUnavailableError(ResourceError):
"""Exception for unavailable system resources."""
def __init__(self, resource_type: ResourceType, message: str):
super().__init__(message, resource_type, "RESOURCE_UNAVAILABLE")
# ============================================================================
# RESOURCE LIMITS - Comprehensive Resource Constraints
# ============================================================================
@dataclass(frozen=True)
class ResourceLimit:
"""
Immutable resource limit definition with validation and enforcement.
Contracts:
Invariants:
- Soft limit <= hard limit for all resources
- All limits are positive and realistic
- Warning thresholds are within limit ranges
"""
resource_type: ResourceType
soft_limit: float # Warning threshold
hard_limit: float # Enforcement threshold
unit: str # Unit of measurement (%, MB, count, etc.)
priority: ResourcePriority = ResourcePriority.NORMAL
enforcement_enabled: bool = True
grace_period_seconds: int = 30 # Grace period before enforcement
def __post_init__(self):
"""Validate resource limit configuration."""
if self.soft_limit <= 0:
raise ValueError(f"Soft limit must be positive: {self.soft_limit}")
if self.hard_limit <= 0:
raise ValueError(f"Hard limit must be positive: {self.hard_limit}")
if self.soft_limit > self.hard_limit:
raise ValueError(
f"Soft limit {self.soft_limit} cannot exceed hard limit {self.hard_limit}"
)
if self.grace_period_seconds < 0:
raise ValueError(
f"Grace period cannot be negative: {self.grace_period_seconds}"
)
# Validate units for specific resource types
valid_units = {
ResourceType.CPU: {"%", "percent"},
ResourceType.MEMORY: {"MB", "GB", "bytes", "%", "percent"},
ResourceType.DISK_IO: {"MB/s", "bytes/s", "IOPS"},
ResourceType.NETWORK_IO: {"MB/s", "bytes/s", "packets/s"},
ResourceType.FILE_DESCRIPTORS: {"count"},
ResourceType.PROCESSES: {"count"},
ResourceType.THREADS: {"count"},
ResourceType.FILESYSTEM_SIZE: {"MB", "GB", "bytes"},
}
if self.resource_type in valid_units:
if self.unit not in valid_units[self.resource_type]:
raise ValueError(
f"Invalid unit {self.unit} for {self.resource_type.value}"
)
def get_warning_threshold(self) -> float:
"""
Calculate warning threshold (typically 80% of soft limit).
Returns:
float: Warning threshold value
"""
return self.soft_limit * 0.8
def get_critical_threshold(self) -> float:
"""
Calculate critical threshold (typically 90% of hard limit).
Returns:
float: Critical threshold value
"""
return self.hard_limit * 0.9
def get_alert_level(self, current_value: float) -> Optional[AlertLevel]:
"""
Determine alert level based on current resource usage.
Args:
current_value: Current resource usage value
Returns:
Optional[AlertLevel]: Alert level or None if within normal range
"""
if current_value >= self.hard_limit:
return AlertLevel.EMERGENCY
elif current_value >= self.get_critical_threshold():
return AlertLevel.CRITICAL
elif current_value >= self.soft_limit:
return AlertLevel.WARNING
elif current_value >= self.get_warning_threshold():
return AlertLevel.INFO
else:
return None
def is_exceeded(self, current_value: float, use_soft_limit: bool = False) -> bool:
"""
Check if resource limit is exceeded.
Args:
current_value: Current resource usage
use_soft_limit: Whether to use soft limit instead of hard limit
Returns:
bool: True if limit is exceeded
"""
threshold = self.soft_limit if use_soft_limit else self.hard_limit
return current_value > threshold
@dataclass(frozen=True)
class ResourceLimits:
"""
Immutable collection of resource limits for comprehensive resource management.
Contracts:
Invariants:
- All resource limits are consistent and validated
- No conflicting limits for same resource type
- Resource limits scale appropriately with system capabilities
"""
cpu_limit: ResourceLimit
memory_limit: ResourceLimit
disk_io_limit: ResourceLimit
network_io_limit: ResourceLimit
file_descriptor_limit: ResourceLimit
process_limit: ResourceLimit
thread_limit: ResourceLimit
filesystem_size_limit: ResourceLimit
def __post_init__(self):
"""Validate resource limits collection."""
# Ensure all limits have unique resource types
resource_types = [
self.cpu_limit.resource_type,
self.memory_limit.resource_type,
self.disk_io_limit.resource_type,
self.network_io_limit.resource_type,
self.file_descriptor_limit.resource_type,
self.process_limit.resource_type,
self.thread_limit.resource_type,
self.filesystem_size_limit.resource_type,
]
if len(resource_types) != len(set(resource_types)):
raise ValueError("Duplicate resource types in limits")
# Validate resource type matching
expected_types = {
self.cpu_limit: ResourceType.CPU,
self.memory_limit: ResourceType.MEMORY,
self.disk_io_limit: ResourceType.DISK_IO,
self.network_io_limit: ResourceType.NETWORK_IO,
self.file_descriptor_limit: ResourceType.FILE_DESCRIPTORS,
self.process_limit: ResourceType.PROCESSES,
self.thread_limit: ResourceType.THREADS,
self.filesystem_size_limit: ResourceType.FILESYSTEM_SIZE,
}
for limit, expected_type in expected_types.items():
if limit.resource_type != expected_type:
raise ValueError(
f"Resource limit type mismatch: {limit.resource_type} != {expected_type}"
)
def get_limit_by_type(self, resource_type: ResourceType) -> ResourceLimit:
"""
Get resource limit by type.
Args:
resource_type: Type of resource
Returns:
ResourceLimit: Corresponding resource limit
Raises:
ValueError: If resource type not found
"""
limit_map = {
ResourceType.CPU: self.cpu_limit,
ResourceType.MEMORY: self.memory_limit,
ResourceType.DISK_IO: self.disk_io_limit,
ResourceType.NETWORK_IO: self.network_io_limit,
ResourceType.FILE_DESCRIPTORS: self.file_descriptor_limit,
ResourceType.PROCESSES: self.process_limit,
ResourceType.THREADS: self.thread_limit,
ResourceType.FILESYSTEM_SIZE: self.filesystem_size_limit,
}
if resource_type not in limit_map:
raise ValueError(f"Unknown resource type: {resource_type}")
return limit_map[resource_type]
def get_all_limits(self) -> List[ResourceLimit]:
"""
Get all resource limits as a list.
Returns:
List[ResourceLimit]: All resource limits
"""
return [
self.cpu_limit,
self.memory_limit,
self.disk_io_limit,
self.network_io_limit,
self.file_descriptor_limit,
self.process_limit,
self.thread_limit,
self.filesystem_size_limit,
]
def get_critical_limits(self) -> List[ResourceLimit]:
"""
Get limits marked as critical priority.
Returns:
List[ResourceLimit]: Critical priority limits
"""
return [
limit
for limit in self.get_all_limits()
if limit.priority == ResourcePriority.CRITICAL
]
# ============================================================================
# PERFORMANCE METRICS - Real-time Resource Monitoring
# ============================================================================
@dataclass(frozen=True)
class ResourceUsage:
"""
Immutable snapshot of resource usage for a specific resource type.
Contracts:
Invariants:
- All usage values are non-negative
- Timestamp represents accurate measurement time
- Units match corresponding resource limit units
"""
resource_type: ResourceType
current_value: float
peak_value: float
average_value: float
unit: str
timestamp: datetime = field(default_factory=datetime.now)
measurement_duration_seconds: float = 0.0
def __post_init__(self):
"""Validate resource usage measurement."""
if self.current_value < 0:
raise ValueError(f"Current value cannot be negative: {self.current_value}")
if self.peak_value < 0:
raise ValueError(f"Peak value cannot be negative: {self.peak_value}")
if self.average_value < 0:
raise ValueError(f"Average value cannot be negative: {self.average_value}")
if self.measurement_duration_seconds < 0:
raise ValueError(
f"Measurement duration cannot be negative: {self.measurement_duration_seconds}"
)
# Peak should be >= current and average
if self.peak_value < self.current_value:
raise ValueError(
f"Peak value {self.peak_value} cannot be less than current {self.current_value}"
)
def get_utilization_percentage(self, limit: ResourceLimit) -> float:
"""
Calculate resource utilization as percentage of limit.
Args:
limit: Resource limit for calculation
Returns:
float: Utilization percentage (0-100+)
"""
if limit.resource_type != self.resource_type:
raise ValueError(
f"Resource type mismatch: {self.resource_type} vs {limit.resource_type}"
)
return (self.current_value / limit.hard_limit) * 100
def is_trending_up(self, previous_usage: "ResourceUsage") -> bool:
"""
Determine if resource usage is trending upward.
Args:
previous_usage: Previous resource usage measurement
Returns:
bool: True if usage is increasing
"""
if previous_usage.resource_type != self.resource_type:
raise ValueError("Cannot compare different resource types")
return self.current_value > previous_usage.current_value
def get_change_rate(self, previous_usage: "ResourceUsage") -> float:
"""
Calculate rate of change between measurements.
Args:
previous_usage: Previous resource usage measurement
Returns:
float: Rate of change per second
"""
if previous_usage.resource_type != self.resource_type:
raise ValueError("Cannot compare different resource types")
time_diff = (self.timestamp - previous_usage.timestamp).total_seconds()
if time_diff <= 0:
return 0.0
value_diff = self.current_value - previous_usage.current_value
return value_diff / time_diff
@dataclass(frozen=True)
class PerformanceSnapshot:
"""
Immutable comprehensive performance snapshot for a process or system.
Contracts:
Invariants:
- All resource usage measurements are consistent
- Timestamps are synchronized across measurements
- Performance metrics are accurate and validated
"""
process_id: Optional[int]
timestamp: datetime = field(default_factory=datetime.now)
cpu_usage: ResourceUsage = None
memory_usage: ResourceUsage = None
disk_io_usage: ResourceUsage = None
network_io_usage: ResourceUsage = None
file_descriptor_usage: ResourceUsage = None
process_count: ResourceUsage = None
thread_count: ResourceUsage = None
filesystem_usage: ResourceUsage = None
def __post_init__(self):
"""Validate performance snapshot consistency."""
# Collect all non-None usage measurements
usages = [
usage
for usage in [
self.cpu_usage,
self.memory_usage,
self.disk_io_usage,
self.network_io_usage,
self.file_descriptor_usage,
self.process_count,
self.thread_count,
self.filesystem_usage,
]
if usage is not None
]
# Validate timestamp consistency (within 5 seconds)
for usage in usages:
time_diff = abs((usage.timestamp - self.timestamp).total_seconds())
if time_diff > 5.0:
raise ValueError(
f"Usage timestamp inconsistent: {time_diff} seconds difference"
)
def get_all_usages(self) -> List[ResourceUsage]:
"""
Get all non-None resource usage measurements.
Returns:
List[ResourceUsage]: All available usage measurements
"""
return [
usage
for usage in [
self.cpu_usage,
self.memory_usage,
self.disk_io_usage,
self.network_io_usage,
self.file_descriptor_usage,
self.process_count,
self.thread_count,
self.filesystem_usage,
]
if usage is not None
]
def get_usage_by_type(self, resource_type: ResourceType) -> Optional[ResourceUsage]:
"""
Get resource usage by type.
Args:
resource_type: Type of resource
Returns:
Optional[ResourceUsage]: Usage measurement if available
"""
usage_map = {
ResourceType.CPU: self.cpu_usage,
ResourceType.MEMORY: self.memory_usage,
ResourceType.DISK_IO: self.disk_io_usage,
ResourceType.NETWORK_IO: self.network_io_usage,
ResourceType.FILE_DESCRIPTORS: self.file_descriptor_usage,
ResourceType.PROCESSES: self.process_count,
ResourceType.THREADS: self.thread_count,
ResourceType.FILESYSTEM_SIZE: self.filesystem_usage,
}
return usage_map.get(resource_type)
def check_limits(self, limits: ResourceLimits) -> List[ResourceLimitExceededError]:
"""
Check all resource usages against limits.
Args:
limits: Resource limits to check against
Returns:
List[ResourceLimitExceededError]: List of limit violations
"""
violations = []
for usage in self.get_all_usages():
try:
limit = limits.get_limit_by_type(usage.resource_type)
if limit.is_exceeded(usage.current_value):
violations.append(
ResourceLimitExceededError(
usage.resource_type, usage.current_value, limit.hard_limit
)
)
except ValueError:
# Resource type not in limits - skip
continue
return violations
def get_health_status(self, limits: ResourceLimits) -> str:
"""
Get overall health status based on resource usage.
Args:
limits: Resource limits for evaluation
Returns:
str: Health status (healthy, warning, critical, emergency)
"""
max_alert_level = None
for usage in self.get_all_usages():
try:
limit = limits.get_limit_by_type(usage.resource_type)
alert_level = limit.get_alert_level(usage.current_value)
if alert_level is not None:
if (
max_alert_level is None
or alert_level.value > max_alert_level.value
):
max_alert_level = alert_level
except ValueError:
continue
if max_alert_level is None:
return "healthy"
else:
return max_alert_level.value
# ============================================================================
# RESOURCE MONITORING - Real-time Tracking and Collection
# ============================================================================
@dataclass(frozen=True)
class ResourceAlert:
"""
Immutable resource alert for threshold violations.
Contracts:
Invariants:
- Alert timestamp is accurate and recent
- Resource information is complete and validated
- Alert level corresponds to actual threshold violation
"""
alert_id: str
timestamp: datetime
resource_type: ResourceType
alert_level: AlertLevel
current_value: float
threshold_value: float
message: str
process_id: Optional[int] = None
agent_name: Optional[str] = None
def __post_init__(self):
"""Validate resource alert structure."""
if not self.alert_id or not self.alert_id.strip():
raise ValueError("Alert ID cannot be empty")
if not self.message or not self.message.strip():
raise ValueError("Alert message cannot be empty")
if self.current_value < 0:
raise ValueError(f"Current value cannot be negative: {self.current_value}")
if self.threshold_value <= 0:
raise ValueError(
f"Threshold value must be positive: {self.threshold_value}"
)
def is_critical_or_emergency(self) -> bool:
"""Check if alert is critical or emergency level."""
return self.alert_level in [AlertLevel.CRITICAL, AlertLevel.EMERGENCY]
def get_severity_score(self) -> int:
"""
Get numeric severity score for alert prioritization.
Returns:
int: Severity score (1-4, higher is more severe)
"""
severity_map = {
AlertLevel.INFO: 1,
AlertLevel.WARNING: 2,
AlertLevel.CRITICAL: 3,
AlertLevel.EMERGENCY: 4,
}
return severity_map[self.alert_level]
class ResourceMonitor:
"""
Real-time resource monitoring with limit enforcement and alerting.
Architecture:
- Pattern: Observer pattern for real-time monitoring
- Security: Resource boundaries with automatic enforcement
- Performance: Efficient collection with minimal overhead
- Integration: Central monitoring for all system resources
Contracts:
Invariants:
- Resource measurements are accurate and timely
- Alerts are generated for all threshold violations
- Monitoring overhead is minimal and bounded
"""
def __init__(
self, limits: ResourceLimits, alert_callback: Optional[callable] = None
):
"""
Initialize resource monitor with limits and alert handling.
Args:
limits: Resource limits for monitoring
alert_callback: Optional callback for alert notifications
"""
self.limits = limits
self.alert_callback = alert_callback
self.last_snapshot: Optional[PerformanceSnapshot] = None
self.alert_history: List[ResourceAlert] = []
self.max_alert_history = 1000
def collect_process_metrics(self, process_id: int) -> PerformanceSnapshot:
"""
Collect comprehensive resource metrics for a specific process.
Args:
process_id: Process ID to monitor
Returns:
PerformanceSnapshot: Current resource usage
Raises:
ResourceUnavailableError: If process monitoring fails
"""
try:
process = psutil.Process(process_id)
timestamp = datetime.now()
# Collect CPU usage
cpu_percent = process.cpu_percent(interval=0.1)
cpu_usage = ResourceUsage(
resource_type=ResourceType.CPU,
current_value=cpu_percent,
peak_value=cpu_percent, # Would track over time in real implementation
average_value=cpu_percent,
unit="%",
timestamp=timestamp,
)
# Collect memory usage
memory_info = process.memory_info()
memory_mb = memory_info.rss / (1024 * 1024)
memory_usage = ResourceUsage(
resource_type=ResourceType.MEMORY,
current_value=memory_mb,
peak_value=memory_mb,
average_value=memory_mb,
unit="MB",
timestamp=timestamp,
)
# Collect I/O usage
try:
io_counters = process.io_counters()
disk_io_mb = (io_counters.read_bytes + io_counters.write_bytes) / (
1024 * 1024
)
disk_io_usage = ResourceUsage(
resource_type=ResourceType.DISK_IO,
current_value=disk_io_mb,
peak_value=disk_io_mb,
average_value=disk_io_mb,
unit="MB",
timestamp=timestamp,
)
except (psutil.AccessDenied, AttributeError):
disk_io_usage = None
# Collect file descriptor usage
try:
num_fds = process.num_fds()
fd_usage = ResourceUsage(
resource_type=ResourceType.FILE_DESCRIPTORS,
current_value=float(num_fds),
peak_value=float(num_fds),
average_value=float(num_fds),
unit="count",
timestamp=timestamp,
)
except (psutil.AccessDenied, AttributeError):
fd_usage = None
# Collect thread count
try:
num_threads = process.num_threads()
thread_usage = ResourceUsage(
resource_type=ResourceType.THREADS,
current_value=float(num_threads),
peak_value=float(num_threads),
average_value=float(num_threads),
unit="count",
timestamp=timestamp,
)
except (psutil.AccessDenied, AttributeError):
thread_usage = None
snapshot = PerformanceSnapshot(
process_id=process_id,
timestamp=timestamp,
cpu_usage=cpu_usage,
memory_usage=memory_usage,
disk_io_usage=disk_io_usage,
file_descriptor_usage=fd_usage,
thread_count=thread_usage,
)
# Check for limit violations and generate alerts
self._check_and_alert(snapshot)
self.last_snapshot = snapshot
return snapshot
except psutil.NoSuchProcess:
raise ResourceUnavailableError(
ResourceType.PROCESSES, f"Process {process_id} not found"
)
except psutil.AccessDenied:
raise ResourceUnavailableError(
ResourceType.PROCESSES, f"Access denied to process {process_id}"
)
except Exception as e:
raise ResourceUnavailableError(
ResourceType.PROCESSES,
f"Failed to collect metrics for process {process_id}: {e}",
)
def _check_and_alert(self, snapshot: PerformanceSnapshot) -> None:
"""
Check resource usage against limits and generate alerts.
Args:
snapshot: Performance snapshot to check
"""
for usage in snapshot.get_all_usages():
try:
limit = self.limits.get_limit_by_type(usage.resource_type)
alert_level = limit.get_alert_level(usage.current_value)
if alert_level is not None:
alert = ResourceAlert(
alert_id=f"{usage.resource_type.value}_{int(time.time())}",
timestamp=datetime.now(),
resource_type=usage.resource_type,
alert_level=alert_level,
current_value=usage.current_value,
threshold_value=(
limit.soft_limit
if alert_level == AlertLevel.WARNING
else limit.hard_limit
),
message=f"{usage.resource_type.value} usage {usage.current_value} {usage.unit} exceeds {alert_level.value} threshold",
process_id=snapshot.process_id,
)
# Add to history
self.alert_history.append(alert)
if len(self.alert_history) > self.max_alert_history:
self.alert_history.pop(0)
# Trigger callback if available
if self.alert_callback:
self.alert_callback(alert)
except ValueError:
# Resource type not in limits - skip
continue
def get_recent_alerts(self, minutes: int = 10) -> List[ResourceAlert]:
"""
Get alerts from the last N minutes.
Args:
minutes: Number of minutes to look back
Returns:
List[ResourceAlert]: Recent alerts
"""
cutoff_time = datetime.now() - timedelta(minutes=minutes)
return [alert for alert in self.alert_history if alert.timestamp >= cutoff_time]
def get_critical_alerts(self) -> List[ResourceAlert]:
"""
Get all critical and emergency alerts.
Returns:
List[ResourceAlert]: Critical alerts
"""
return [
alert for alert in self.alert_history if alert.is_critical_or_emergency()
]
# ============================================================================
# RESOURCE CONFIGURATION - Default and Preset Configurations
# ============================================================================
def create_default_resource_limits() -> ResourceLimits:
"""
Create default resource limits for typical agent operations.
Returns:
ResourceLimits: Default resource limits
"""
return ResourceLimits(
cpu_limit=ResourceLimit(
resource_type=ResourceType.CPU,
soft_limit=25.0,
hard_limit=50.0,
unit="%",
priority=ResourcePriority.HIGH,
),
memory_limit=ResourceLimit(
resource_type=ResourceType.MEMORY,
soft_limit=512.0,
hard_limit=1024.0,
unit="MB",
priority=ResourcePriority.CRITICAL,
),
disk_io_limit=ResourceLimit(
resource_type=ResourceType.DISK_IO,
soft_limit=50.0,
hard_limit=100.0,
unit="MB/s",
priority=ResourcePriority.NORMAL,
),
network_io_limit=ResourceLimit(
resource_type=ResourceType.NETWORK_IO,
soft_limit=10.0,
hard_limit=25.0,
unit="MB/s",
priority=ResourcePriority.LOW,
),
file_descriptor_limit=ResourceLimit(
resource_type=ResourceType.FILE_DESCRIPTORS,
soft_limit=500.0,
hard_limit=1000.0,
unit="count",
priority=ResourcePriority.NORMAL,
),
process_limit=ResourceLimit(
resource_type=ResourceType.PROCESSES,
soft_limit=1.0,
hard_limit=2.0,
unit="count",
priority=ResourcePriority.HIGH,
),
thread_limit=ResourceLimit(
resource_type=ResourceType.THREADS,
soft_limit=50.0,
hard_limit=100.0,
unit="count",
priority=ResourcePriority.NORMAL,
),
filesystem_size_limit=ResourceLimit(
resource_type=ResourceType.FILESYSTEM_SIZE,
soft_limit=1024.0,
hard_limit=2048.0,
unit="MB",
priority=ResourcePriority.LOW,
),
)
def create_high_security_resource_limits() -> ResourceLimits:
"""
Create restrictive resource limits for high-security environments.
Returns:
ResourceLimits: High-security resource limits
"""
return ResourceLimits(
cpu_limit=ResourceLimit(
resource_type=ResourceType.CPU,
soft_limit=15.0,
hard_limit=25.0,
unit="%",
priority=ResourcePriority.CRITICAL,
),
memory_limit=ResourceLimit(
resource_type=ResourceType.MEMORY,
soft_limit=256.0,
hard_limit=512.0,
unit="MB",
priority=ResourcePriority.CRITICAL,
),
disk_io_limit=ResourceLimit(
resource_type=ResourceType.DISK_IO,
soft_limit=25.0,
hard_limit=50.0,
unit="MB/s",
priority=ResourcePriority.HIGH,
),
network_io_limit=ResourceLimit(
resource_type=ResourceType.NETWORK_IO,
soft_limit=5.0,
hard_limit=10.0,
unit="MB/s",
priority=ResourcePriority.NORMAL,
enforcement_enabled=False, # Disabled for high security
),
file_descriptor_limit=ResourceLimit(
resource_type=ResourceType.FILE_DESCRIPTORS,
soft_limit=250.0,
hard_limit=500.0,
unit="count",
priority=ResourcePriority.HIGH,
),
process_limit=ResourceLimit(
resource_type=ResourceType.PROCESSES,
soft_limit=1.0,
hard_limit=1.0,
unit="count",
priority=ResourcePriority.CRITICAL,
),
thread_limit=ResourceLimit(
resource_type=ResourceType.THREADS,
soft_limit=25.0,
hard_limit=50.0,
unit="count",
priority=ResourcePriority.HIGH,
),
filesystem_size_limit=ResourceLimit(
resource_type=ResourceType.FILESYSTEM_SIZE,
soft_limit=512.0,
hard_limit=1024.0,
unit="MB",
priority=ResourcePriority.NORMAL,
),
)