"""Sensor Manager - TASK_65 Phase 2 Core IoT Engine.
Sensor data collection, processing, and event triggering with real-time analytics.
Provides comprehensive sensor lifecycle management and intelligent automation triggers.
Architecture: Data Collection + Real-Time Processing + Event Triggering + Analytics Engine
Performance: <50ms sensor readings, <100ms trigger evaluation, <200ms data processing
Intelligence: Pattern recognition, anomaly detection, predictive analytics, adaptive thresholds
"""
from __future__ import annotations
import asyncio
import contextlib
import statistics
from collections import defaultdict, deque
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from enum import Enum
from typing import TYPE_CHECKING, Any
from src.core.contracts import require
from src.core.either import Either
from src.core.iot_architecture import (
AutomationAction,
AutomationCondition,
IoTIntegrationError,
SensorId,
SensorReading,
SensorType,
)
if TYPE_CHECKING:
from collections.abc import Callable
class DataAggregationMethod(Enum):
"""Data aggregation methods."""
AVERAGE = "average"
MINIMUM = "minimum"
MAXIMUM = "maximum"
SUM = "sum"
COUNT = "count"
MEDIAN = "median"
MODE = "mode"
RANGE = "range"
STANDARD_DEVIATION = "standard_deviation"
PERCENTILE = "percentile"
class TriggerEvaluationMode(Enum):
"""Trigger evaluation modes."""
IMMEDIATE = "immediate"
BUFFERED = "buffered"
STATISTICAL = "statistical"
TREND_BASED = "trend_based"
ML_PREDICTION = "ml_prediction"
class AlertSeverity(Enum):
"""Alert severity levels."""
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class SensorConfiguration:
"""Sensor configuration and metadata."""
sensor_id: SensorId
sensor_type: SensorType
sensor_name: str
# Data collection settings
collection_interval: int = 60 # seconds
data_retention_hours: int = 168 # 7 days
quality_threshold: float = 0.8
# Validation settings
min_value: float | None = None
max_value: float | None = None
expected_unit: str | None = None
# Processing settings
enable_smoothing: bool = True
smoothing_factor: float = 0.1
enable_anomaly_detection: bool = True
anomaly_threshold: float = 2.0 # standard deviations
# Trigger settings
enable_triggers: bool = True
trigger_evaluation_mode: TriggerEvaluationMode = TriggerEvaluationMode.IMMEDIATE
buffer_size: int = 10
# Location and metadata
location: str | None = None
room: str | None = None
device_id: str | None = None
installation_date: datetime = field(default_factory=lambda: datetime.now(UTC))
# Custom settings
custom_properties: dict[str, Any] = field(default_factory=dict)
tags: set[str] = field(default_factory=set)
@dataclass
class SensorStatistics:
"""Sensor data statistics."""
sensor_id: SensorId
# Basic statistics
count: int = 0
min_value: float | None = None
max_value: float | None = None
average: float | None = None
median: float | None = None
# Advanced statistics
standard_deviation: float | None = None
variance: float | None = None
percentile_25: float | None = None
percentile_75: float | None = None
percentile_95: float | None = None
# Time-based statistics
first_reading: datetime | None = None
last_reading: datetime | None = None
reading_frequency: float | None = None # readings per hour
# Quality metrics
average_quality: float = 1.0
valid_readings: int = 0
invalid_readings: int = 0
# Trend analysis
trend_direction: str = "stable" # increasing, decreasing, stable
trend_confidence: float = 0.0
# Anomaly detection
anomalies_detected: int = 0
last_anomaly: datetime | None = None
def update_statistics(self, readings: list[SensorReading]) -> bool:
"""Update statistics from readings."""
if not readings:
return
numeric_values = []
quality_values = []
for reading in readings:
if isinstance(reading.value, int | float) and reading.is_valid():
numeric_values.append(float(reading.value))
quality_values.append(reading.quality)
self.valid_readings += 1
else:
self.invalid_readings += 1
if numeric_values:
self.count = len(numeric_values)
self.min_value = min(numeric_values)
self.max_value = max(numeric_values)
self.average = statistics.mean(numeric_values)
self.median = statistics.median(numeric_values)
if len(numeric_values) > 1:
self.standard_deviation = statistics.stdev(numeric_values)
self.variance = statistics.variance(numeric_values)
# Calculate percentiles
sorted_values = sorted(numeric_values)
self.percentile_25 = self._percentile(sorted_values, 25)
self.percentile_75 = self._percentile(sorted_values, 75)
self.percentile_95 = self._percentile(sorted_values, 95)
# Update quality metrics
if quality_values:
self.average_quality = statistics.mean(quality_values)
# Update time-based statistics
self.first_reading = min(reading.timestamp for reading in readings)
self.last_reading = max(reading.timestamp for reading in readings)
# Calculate reading frequency
time_span = (
self.last_reading - self.first_reading
).total_seconds() / 3600 # hours
if time_span > 0:
self.reading_frequency = len(readings) / time_span
# Update trend analysis
self._update_trend_analysis(numeric_values)
def _percentile(self, sorted_values: list[float], percentile: float) -> float:
"""Calculate percentile value."""
if not sorted_values:
return 0.0
index = (percentile / 100) * (len(sorted_values) - 1)
lower_index = int(index)
upper_index = min(lower_index + 1, len(sorted_values) - 1)
if lower_index == upper_index:
return sorted_values[lower_index]
# Linear interpolation
weight = index - lower_index
return (
sorted_values[lower_index] * (1 - weight)
+ sorted_values[upper_index] * weight
)
def _update_trend_analysis(self, values: list[float]) -> Any:
"""Update trend analysis."""
if len(values) < 3:
self.trend_direction = "stable"
self.trend_confidence = 0.0
return
# Simple linear trend analysis
recent_values = values[-10:] # Use last 10 values
if len(recent_values) >= 3:
# Calculate trend slope
x_values = list(range(len(recent_values)))
n = len(recent_values)
sum_x = sum(x_values)
sum_y = sum(recent_values)
sum_xy = sum(x * y for x, y in zip(x_values, recent_values, strict=False))
sum_x2 = sum(x * x for x in x_values)
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x)
# Determine trend direction
if abs(slope) < 0.01: # Very small slope
self.trend_direction = "stable"
self.trend_confidence = 0.5
elif slope > 0:
self.trend_direction = "increasing"
self.trend_confidence = min(abs(slope) * 10, 1.0)
else:
self.trend_direction = "decreasing"
self.trend_confidence = min(abs(slope) * 10, 1.0)
@dataclass
class SensorAlert:
"""Sensor alert information."""
alert_id: str
sensor_id: SensorId
severity: AlertSeverity
# Alert details
title: str
description: str
trigger_value: float | str | bool
threshold_value: float | str | bool
# Timing
triggered_at: datetime
acknowledged_at: datetime | None = None
resolved_at: datetime | None = None
# Status
acknowledged: bool = False
resolved: bool = False
# Context
sensor_reading: SensorReading | None = None
trigger_condition: AutomationCondition | None = None
# Metadata
metadata: dict[str, Any] = field(default_factory=dict)
def acknowledge(self, acknowledged_by: str | None = None) -> None:
"""Acknowledge the alert."""
self.acknowledged = True
self.acknowledged_at = datetime.now(UTC)
if acknowledged_by:
self.metadata["acknowledged_by"] = acknowledged_by
def resolve(self, resolution_note: str | None = None) -> None:
"""Resolve the alert."""
self.resolved = True
self.resolved_at = datetime.now(UTC)
if resolution_note:
self.metadata["resolution_note"] = resolution_note
class SensorManager:
"""Advanced sensor data management with real-time processing and analytics."""
def __init__(self):
self.sensors: dict[SensorId, SensorConfiguration] = {}
self.sensor_data: dict[SensorId, deque] = {} # Recent readings
self.sensor_statistics: dict[SensorId, SensorStatistics] = {}
# Automation and triggers
self.automation_conditions: dict[str, AutomationCondition] = {}
self.automation_actions: dict[str, AutomationAction] = {}
self.active_alerts: dict[str, SensorAlert] = {}
# Data processing
self.aggregation_results: dict[SensorId, dict[str, Any]] = {}
self.anomaly_history: dict[SensorId, list[dict[str, Any]]] = defaultdict(list)
# Performance metrics
self.processing_metrics = {
"readings_processed": 0,
"triggers_evaluated": 0,
"alerts_generated": 0,
"anomalies_detected": 0,
"average_processing_time": 0.0,
}
# Event handlers
self.reading_received_handlers: list[Callable[[SensorReading], None]] = []
self.trigger_activated_handlers: list[
Callable[[AutomationCondition, SensorReading], None]
] = []
self.alert_generated_handlers: list[Callable[[SensorAlert], None]] = []
self.anomaly_detected_handlers: list[
Callable[[SensorId, SensorReading, dict[str, Any]], None]
] = []
# Background processing
self._processing_task: asyncio.Task | None = None
self._analytics_task: asyncio.Task | None = None
# Start background services
# Start background services (only if event loop is running)
try:
asyncio.get_running_loop()
asyncio.create_task(self._start_background_services())
except RuntimeError:
# No event loop running, skip background task
pass
@require(lambda config: isinstance(config, SensorConfiguration))
async def register_sensor(
self,
config: SensorConfiguration,
) -> Either[IoTIntegrationError, bool]:
"""Register a new sensor for monitoring."""
try:
if config.sensor_id in self.sensors:
return Either.error(
IoTIntegrationError(
f"Sensor already registered: {config.sensor_id}",
),
)
# Validate configuration
if config.collection_interval < 1:
return Either.error(
IoTIntegrationError(
f"Invalid collection interval: {config.collection_interval}",
),
)
if config.quality_threshold < 0 or config.quality_threshold > 1:
return Either.error(
IoTIntegrationError(
f"Invalid quality threshold: {config.quality_threshold}",
),
)
# Register sensor
self.sensors[config.sensor_id] = config
self.sensor_data[config.sensor_id] = deque(
maxlen=config.buffer_size * 10,
) # Store more for analytics
self.sensor_statistics[config.sensor_id] = SensorStatistics(
sensor_id=config.sensor_id,
)
self.aggregation_results[config.sensor_id] = {}
return Either.success(True)
except Exception as e:
return Either.error(
IoTIntegrationError(f"Sensor registration failed: {e!s}"),
)
@require(lambda reading: isinstance(reading, SensorReading))
async def process_sensor_reading(
self,
reading: SensorReading,
) -> Either[IoTIntegrationError, dict[str, Any]]:
"""Process a new sensor reading."""
try:
processing_start = datetime.now(UTC)
# Check if sensor is registered
if reading.sensor_id not in self.sensors:
return Either.error(
IoTIntegrationError(f"Sensor not registered: {reading.sensor_id}"),
)
config = self.sensors[reading.sensor_id]
# Validate reading
validation_result = self._validate_reading(reading, config)
if validation_result.is_error():
return validation_result
# Store reading
self.sensor_data[reading.sensor_id].append(reading)
# Process reading
processing_results = {
"sensor_id": reading.sensor_id,
"reading_timestamp": reading.timestamp.isoformat(),
"value": reading.value,
"quality": reading.quality,
"processed_at": datetime.now(UTC).isoformat(),
}
# Apply smoothing if enabled
if config.enable_smoothing:
smoothed_value = self._apply_smoothing(reading, config)
processing_results["smoothed_value"] = smoothed_value
# Anomaly detection
if config.enable_anomaly_detection:
anomaly_result = await self._detect_anomaly(reading, config)
if anomaly_result:
processing_results["anomaly_detected"] = anomaly_result
self.anomaly_history[reading.sensor_id].append(
{
"timestamp": reading.timestamp.isoformat(),
"value": reading.value,
"anomaly_details": anomaly_result,
},
)
# Trigger anomaly event handlers
for handler in self.anomaly_detected_handlers:
with contextlib.suppress(Exception):
handler(reading.sensor_id, reading, anomaly_result)
# Trigger evaluation
if config.enable_triggers:
triggered_conditions = await self._evaluate_triggers(reading, config)
if triggered_conditions:
processing_results["triggered_conditions"] = [
condition.condition_id for condition in triggered_conditions
]
# Execute triggered actions
for condition in triggered_conditions:
await self._execute_trigger_actions(condition, reading)
# Update statistics
await self._update_sensor_statistics(reading.sensor_id)
# Trigger reading event handlers
for handler in self.reading_received_handlers:
with contextlib.suppress(Exception):
handler(reading)
# Update performance metrics
processing_time = (
datetime.now(UTC) - processing_start
).total_seconds() * 1000
self.processing_metrics["readings_processed"] += 1
current_avg = self.processing_metrics["average_processing_time"]
total_readings = self.processing_metrics["readings_processed"]
self.processing_metrics["average_processing_time"] = (
current_avg * (total_readings - 1) + processing_time
) / total_readings
return Either.success(processing_results)
except Exception as e:
return Either.error(
IoTIntegrationError(f"Reading processing failed: {e!s}"),
)
async def add_automation_condition(
self,
condition: AutomationCondition,
) -> Either[IoTIntegrationError, bool]:
"""Add automation condition for sensor-based triggers."""
try:
if condition.condition_id in self.automation_conditions:
return Either.error(
IoTIntegrationError(
f"Condition already exists: {condition.condition_id}",
),
)
# Validate condition
if condition.sensor_id and condition.sensor_id not in self.sensors:
return Either.error(
IoTIntegrationError(
f"Sensor not registered: {condition.sensor_id}",
),
)
self.automation_conditions[condition.condition_id] = condition
return Either.success(True)
except Exception as e:
return Either.error(
IoTIntegrationError(f"Failed to add automation condition: {e!s}"),
)
async def add_automation_action(
self,
action: AutomationAction,
) -> Either[IoTIntegrationError, bool]:
"""Add automation action for triggered conditions."""
try:
if action.action_id in self.automation_actions:
return Either.error(
IoTIntegrationError(f"Action already exists: {action.action_id}"),
)
self.automation_actions[action.action_id] = action
return Either.success(True)
except Exception as e:
return Either.error(
IoTIntegrationError(f"Failed to add automation action: {e!s}"),
)
async def get_sensor_data(
self,
sensor_id: SensorId,
time_range: timedelta | None = None,
aggregation: DataAggregationMethod | None = None,
) -> Either[IoTIntegrationError, dict[str, Any]]:
"""Get sensor data with optional time filtering and aggregation."""
try:
if sensor_id not in self.sensors:
return Either.error(
IoTIntegrationError(f"Sensor not registered: {sensor_id}"),
)
readings = list(self.sensor_data[sensor_id])
# Apply time filtering
if time_range:
cutoff_time = datetime.now(UTC) - time_range
readings = [r for r in readings if r.timestamp >= cutoff_time]
# Prepare base result
result = {
"sensor_id": sensor_id,
"sensor_type": self.sensors[sensor_id].sensor_type.value,
"readings_count": len(readings),
"time_range": time_range.total_seconds() if time_range else None,
"data": [reading.to_dict() for reading in readings],
}
# Apply aggregation
if aggregation and readings:
aggregated_value = await self._aggregate_data(readings, aggregation)
result["aggregated_value"] = aggregated_value
result["aggregation_method"] = aggregation.value
# Include statistics
if sensor_id in self.sensor_statistics:
stats = self.sensor_statistics[sensor_id]
result["statistics"] = {
"count": stats.count,
"average": stats.average,
"min_value": stats.min_value,
"max_value": stats.max_value,
"standard_deviation": stats.standard_deviation,
"trend_direction": stats.trend_direction,
"trend_confidence": stats.trend_confidence,
"anomalies_detected": stats.anomalies_detected,
}
return Either.success(result)
except Exception as e:
return Either.error(
IoTIntegrationError(f"Failed to get sensor data: {e!s}"),
)
async def get_active_alerts(
self,
severity_filter: AlertSeverity | None = None,
) -> list[SensorAlert]:
"""Get active sensor alerts with optional severity filtering."""
alerts = [alert for alert in self.active_alerts.values() if not alert.resolved]
if severity_filter:
alerts = [alert for alert in alerts if alert.severity == severity_filter]
# Sort by severity and timestamp
severity_order = {
AlertSeverity.CRITICAL: 0,
AlertSeverity.ERROR: 1,
AlertSeverity.WARNING: 2,
AlertSeverity.INFO: 3,
}
alerts.sort(key=lambda a: (severity_order.get(a.severity, 99), a.triggered_at))
return alerts
# Private methods
def _validate_reading(
self,
reading: SensorReading,
config: SensorConfiguration,
) -> Either[IoTIntegrationError, bool]:
"""Validate sensor reading against configuration."""
try:
# Check sensor type match
if reading.sensor_type != config.sensor_type:
return Either.error(
IoTIntegrationError(
f"Sensor type mismatch: {reading.sensor_type} != {config.sensor_type}",
),
)
# Check quality threshold
if reading.quality < config.quality_threshold:
return Either.error(
IoTIntegrationError(
f"Reading quality below threshold: {reading.quality} < {config.quality_threshold}",
),
)
# Check value range
if isinstance(reading.value, int | float):
if config.min_value is not None and reading.value < config.min_value:
return Either.error(
IoTIntegrationError(
f"Value below minimum: {reading.value} < {config.min_value}",
),
)
if config.max_value is not None and reading.value > config.max_value:
return Either.error(
IoTIntegrationError(
f"Value above maximum: {reading.value} > {config.max_value}",
),
)
# Check unit match
if config.expected_unit and reading.unit != config.expected_unit:
return Either.error(
IoTIntegrationError(
f"Unit mismatch: {reading.unit} != {config.expected_unit}",
),
)
return Either.success(True)
except Exception as e:
return Either.error(
IoTIntegrationError(f"Reading validation failed: {e!s}"),
)
def _apply_smoothing(
self,
reading: SensorReading,
config: SensorConfiguration,
) -> float | Any:
"""Apply exponential smoothing to sensor value."""
if not isinstance(reading.value, int | float):
return reading.value
# Get recent readings for smoothing
recent_readings = list(self.sensor_data[reading.sensor_id])[-5:]
if not recent_readings:
return reading.value
# Apply exponential smoothing
alpha = config.smoothing_factor
smoothed_value = reading.value
for prev_reading in reversed(recent_readings):
if isinstance(prev_reading.value, int | float):
smoothed_value = (
alpha * smoothed_value + (1 - alpha) * prev_reading.value
)
break
return smoothed_value
async def _detect_anomaly(
self,
reading: SensorReading,
config: SensorConfiguration,
) -> dict[str, Any] | None:
"""Detect anomalies in sensor readings."""
if not isinstance(reading.value, int | float):
return None
# Get historical data for anomaly detection
recent_readings = list(self.sensor_data[reading.sensor_id])
if len(recent_readings) < 10: # Need enough data for anomaly detection
return None
# Calculate baseline statistics
historical_values = [
r.value for r in recent_readings if isinstance(r.value, int | float)
]
if len(historical_values) < 5:
return None
mean_value = statistics.mean(historical_values)
std_dev = (
statistics.stdev(historical_values) if len(historical_values) > 1 else 0
)
if std_dev == 0:
return None
# Calculate z-score
z_score = abs((reading.value - mean_value) / std_dev)
if z_score > config.anomaly_threshold:
self.processing_metrics["anomalies_detected"] += 1
return {
"type": "statistical_anomaly",
"z_score": z_score,
"threshold": config.anomaly_threshold,
"baseline_mean": mean_value,
"baseline_std": std_dev,
"severity": "high"
if z_score > config.anomaly_threshold * 1.5
else "medium",
}
return None
async def _evaluate_triggers(
self,
reading: SensorReading,
_config: SensorConfiguration,
) -> list[AutomationCondition]:
"""Evaluate automation triggers for sensor reading."""
triggered_conditions = []
for condition in self.automation_conditions.values():
if (
condition.sensor_id == reading.sensor_id
and condition.enabled
and condition.evaluate(sensor_reading=reading)
):
triggered_conditions.append(condition)
# Update trigger metrics
condition.last_triggered = datetime.now(UTC)
condition.trigger_count += 1
# Trigger event handlers
for handler in self.trigger_activated_handlers:
with contextlib.suppress(Exception):
handler(condition, reading)
self.processing_metrics["triggers_evaluated"] += len(self.automation_conditions)
return triggered_conditions
async def _execute_trigger_actions(
self,
condition: AutomationCondition,
reading: SensorReading,
) -> None:
"""Execute actions for triggered condition."""
# Find associated actions (this would be implemented based on action-condition relationships)
# For now, generate an alert
alert = SensorAlert(
alert_id=f"alert_{condition.condition_id}_{int(datetime.now(UTC).timestamp())}",
sensor_id=reading.sensor_id,
severity=AlertSeverity.WARNING,
title=f"Sensor Trigger: {condition.condition_id}",
description=f"Condition {condition.condition_id} triggered for sensor {reading.sensor_id}",
trigger_value=reading.value,
threshold_value=condition.threshold_value,
triggered_at=datetime.now(UTC),
sensor_reading=reading,
trigger_condition=condition,
)
self.active_alerts[alert.alert_id] = alert
self.processing_metrics["alerts_generated"] += 1
# Trigger alert event handlers
for handler in self.alert_generated_handlers:
with contextlib.suppress(Exception):
handler(alert)
async def _update_sensor_statistics(self, sensor_id: SensorId) -> None:
"""Update sensor statistics."""
if sensor_id in self.sensor_statistics and sensor_id in self.sensor_data:
recent_readings = list(self.sensor_data[sensor_id])
self.sensor_statistics[sensor_id].update_statistics(recent_readings)
async def _aggregate_data(
self,
readings: list[SensorReading],
method: DataAggregationMethod,
) -> Any:
"""Aggregate sensor data using specified method."""
numeric_values = [r.value for r in readings if isinstance(r.value, int | float)]
if not numeric_values:
return None
if method == DataAggregationMethod.AVERAGE:
return statistics.mean(numeric_values)
if method == DataAggregationMethod.MINIMUM:
return min(numeric_values)
if method == DataAggregationMethod.MAXIMUM:
return max(numeric_values)
if method == DataAggregationMethod.SUM:
return sum(numeric_values)
if method == DataAggregationMethod.COUNT:
return len(numeric_values)
if method == DataAggregationMethod.MEDIAN:
return statistics.median(numeric_values)
if method == DataAggregationMethod.STANDARD_DEVIATION:
return statistics.stdev(numeric_values) if len(numeric_values) > 1 else 0
if method == DataAggregationMethod.RANGE:
return max(numeric_values) - min(numeric_values)
return statistics.mean(numeric_values) # Default to average
# Background services
async def _start_background_services(self) -> None:
"""Start background processing and analytics services."""
self._processing_task = asyncio.create_task(self._background_processing_loop())
self._analytics_task = asyncio.create_task(self._analytics_loop())
async def _background_processing_loop(self) -> None:
"""Background processing loop for periodic tasks."""
while True:
try:
await asyncio.sleep(60) # Run every minute
# Clean up old data
await self._cleanup_old_data()
# Update aggregation results
await self._update_aggregations()
except asyncio.CancelledError:
break
except Exception:
await asyncio.sleep(60) # Error recovery
async def _analytics_loop(self) -> Any:
"""Background analytics loop for advanced processing."""
while True:
try:
await asyncio.sleep(300) # Run every 5 minutes
# Update trend analysis
await self._update_trend_analysis()
# Clean up resolved alerts
await self._cleanup_resolved_alerts()
except asyncio.CancelledError:
break
except Exception:
await asyncio.sleep(60) # Error recovery
async def _cleanup_old_data(self) -> None:
"""Clean up old sensor data based on retention settings."""
for sensor_id, config in self.sensors.items():
if sensor_id in self.sensor_data:
cutoff_time = datetime.now(UTC) - timedelta(
hours=config.data_retention_hours,
)
# Remove old readings
readings = self.sensor_data[sensor_id]
while readings and readings[0].timestamp < cutoff_time:
readings.popleft()
async def _update_aggregations(self) -> None:
"""Update aggregation results for all sensors."""
for sensor_id in self.sensors:
if sensor_id in self.sensor_data:
readings = list(self.sensor_data[sensor_id])
if readings:
# Calculate various aggregations
self.aggregation_results[sensor_id] = {
"hourly_average": await self._aggregate_data(
readings[-60:],
DataAggregationMethod.AVERAGE,
),
"daily_average": await self._aggregate_data(
readings,
DataAggregationMethod.AVERAGE,
),
"min_today": await self._aggregate_data(
readings,
DataAggregationMethod.MINIMUM,
),
"max_today": await self._aggregate_data(
readings,
DataAggregationMethod.MAXIMUM,
),
"last_updated": datetime.now(UTC).isoformat(),
}
async def _update_trend_analysis(self) -> None:
"""Update trend analysis for all sensors."""
for sensor_id in self.sensor_statistics:
await self._update_sensor_statistics(sensor_id)
async def _cleanup_resolved_alerts(self) -> None:
"""Clean up old resolved alerts."""
cutoff_time = datetime.now(UTC) - timedelta(
days=7,
) # Keep resolved alerts for 7 days
resolved_alert_ids = [
alert_id
for alert_id, alert in self.active_alerts.items()
if alert.resolved and alert.resolved_at and alert.resolved_at < cutoff_time
]
for alert_id in resolved_alert_ids:
del self.active_alerts[alert_id]
# Event handler management
def add_reading_received_handler(
self,
handler: Callable[[SensorReading], None],
) -> None:
"""Add reading received event handler."""
self.reading_received_handlers.append(handler)
def add_trigger_activated_handler(
self,
handler: Callable[[AutomationCondition, SensorReading], None],
) -> None:
"""Add trigger activated event handler."""
self.trigger_activated_handlers.append(handler)
def add_alert_generated_handler(
self,
handler: Callable[[SensorAlert], None],
) -> None:
"""Add alert generated event handler."""
self.alert_generated_handlers.append(handler)
def add_anomaly_detected_handler(
self,
handler: Callable[[SensorId, SensorReading, dict[str, Any]], None],
) -> None:
"""Add anomaly detected event handler."""
self.anomaly_detected_handlers.append(handler)
# Utility methods
async def start_monitoring(
self,
sensor_ids: list[SensorId],
duration_seconds: int,
sampling_interval: int,
alert_thresholds: dict[str, float],
real_time_alerts: bool,
) -> Either[IoTIntegrationError, dict[str, Any]]:
"""Start monitoring session for specified sensors."""
try:
session_id = f"monitoring_{int(datetime.now(UTC).timestamp())}"
# Validate all sensor IDs
for sensor_id in sensor_ids:
if sensor_id not in self.sensors:
return Either.error(
IoTIntegrationError(f"Sensor not registered: {sensor_id}"),
)
session_info = {
"session_id": session_id,
"sensor_ids": sensor_ids,
"duration_seconds": duration_seconds,
"sampling_interval": sampling_interval,
"alert_thresholds": alert_thresholds,
"real_time_alerts": real_time_alerts,
"started_at": datetime.now(UTC).isoformat(),
}
return Either.success(session_info)
except Exception as e:
return Either.error(
IoTIntegrationError(f"Failed to start monitoring: {e!s}"),
)
async def get_sensor_reading(
self,
sensor_id: SensorId,
) -> Either[IoTIntegrationError, SensorReading]:
"""Get current sensor reading (simulated for demo)."""
try:
if sensor_id not in self.sensors:
return Either.error(
IoTIntegrationError(f"Sensor not registered: {sensor_id}"),
)
config = self.sensors[sensor_id]
# Simulate sensor reading based on sensor type
import random
if config.sensor_type == SensorType.TEMPERATURE:
value = round(random.uniform(18.0, 28.0), 1) # noqa: S311 # Sensor simulation data
unit = "°C"
elif config.sensor_type == SensorType.HUMIDITY:
value = round(random.uniform(30.0, 70.0), 1) # noqa: S311 # Sensor simulation data
unit = "%"
elif config.sensor_type == SensorType.LIGHT:
value = random.randint(0, 1000) # noqa: S311 # Sensor simulation data
unit = "lux"
elif config.sensor_type == SensorType.MOTION:
value = random.choice([True, False]) # noqa: S311 # Sensor simulation data
unit = None
elif config.sensor_type == SensorType.AIR_QUALITY:
value = random.randint(50, 300) # noqa: S311 # Sensor simulation data
unit = "AQI"
else:
value = round(random.uniform(0.0, 100.0), 2) # noqa: S311 # Sensor simulation data
unit = "units"
reading = SensorReading(
sensor_id=sensor_id,
sensor_type=config.sensor_type,
value=value,
unit=unit,
timestamp=datetime.now(UTC),
quality=random.uniform(0.8, 1.0), # noqa: S311 # Sensor simulation data
location=config.location,
device_id=config.device_id,
)
return Either.success(reading)
except Exception as e:
return Either.error(
IoTIntegrationError(f"Failed to get sensor reading: {e!s}"),
)
def get_sensor_metrics(self) -> dict[str, Any]:
"""Get sensor manager metrics."""
total_sensors = len(self.sensors)
total_readings = sum(len(data) for data in self.sensor_data.values())
active_alerts = len(
[alert for alert in self.active_alerts.values() if not alert.resolved],
)
return {
**self.processing_metrics,
"total_sensors": total_sensors,
"total_readings": total_readings,
"active_alerts": active_alerts,
"sensor_types": list(
{config.sensor_type.value for config in self.sensors.values()},
),
"automation_conditions": len(self.automation_conditions),
"automation_actions": len(self.automation_actions),
}
# Export the sensor manager
__all__ = [
"AlertSeverity",
"DataAggregationMethod",
"SensorAlert",
"SensorConfiguration",
"SensorManager",
"SensorStatistics",
"TriggerEvaluationMode",
]