"""IoT Real-Time Processing Engine - TASK_65 Phase 5 Integration & Optimization.
Real-time IoT data processing, stream analytics, event-driven automation,
and low-latency decision-making for responsive IoT systems.
Architecture: Stream Processing + Event-Driven Architecture + Real-Time Analytics + Low-Latency Decision Engine
Performance: <10ms event processing, <50ms stream analytics, <100ms decision execution
Security: Real-time threat detection, streaming encryption, secure event processing
"""
from __future__ import annotations
import asyncio
import logging
import statistics
import uuid
from collections import defaultdict, deque
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from enum import Enum
from typing import TYPE_CHECKING, Any
from ..core.contracts import ensure, require
from ..core.either import Either
from ..core.iot_architecture import (
DeviceId,
IoTIntegrationError,
)
if TYPE_CHECKING:
from collections.abc import Callable
logger = logging.getLogger(__name__)
class ProcessingMode(Enum):
"""Real-time processing modes."""
STREAMING = "streaming"
MICRO_BATCH = "micro_batch"
EVENT_DRIVEN = "event_driven"
HYBRID = "hybrid"
class EventType(Enum):
"""Types of real-time events."""
SENSOR_READING = "sensor_reading"
DEVICE_STATUS = "device_status"
THRESHOLD_BREACH = "threshold_breach"
PATTERN_DETECTED = "pattern_detected"
ANOMALY_DETECTED = "anomaly_detected"
SYSTEM_ALERT = "system_alert"
USER_ACTION = "user_action"
AUTOMATION_TRIGGER = "automation_trigger"
class StreamOperationType(Enum):
"""Types of stream operations."""
FILTER = "filter"
MAP = "map"
REDUCE = "reduce"
WINDOW = "window"
JOIN = "join"
AGGREGATE = "aggregate"
ENRICH = "enrich"
VALIDATE = "validate"
EventId = str
StreamId = str
ProcessorId = str
@dataclass
class RealTimeEvent:
"""Real-time IoT event."""
event_id: EventId
event_type: EventType
source_device: DeviceId
timestamp: datetime
data: dict[str, Any]
priority: int # 1-10, 10 being highest
correlation_id: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
def is_high_priority(self) -> bool:
"""Check if event is high priority."""
return self.priority >= 7
def age_ms(self) -> float:
"""Get event age in milliseconds."""
return (datetime.now(UTC) - self.timestamp).total_seconds() * 1000
@dataclass
class StreamWindow:
"""Time-based or count-based window for stream processing."""
window_id: str
window_type: str # "time", "count", "session"
size: int | timedelta # count or time duration
events: deque = field(default_factory=deque)
created_at: datetime = field(default_factory=lambda: datetime.now(UTC))
def add_event(self, event: RealTimeEvent) -> bool:
"""Add event to window."""
self.events.append(event)
self._maintain_window_size()
def _maintain_window_size(self) -> bool:
"""Maintain window size constraints."""
if self.window_type == "count" and isinstance(self.size, int):
while len(self.events) > self.size:
self.events.popleft()
elif self.window_type == "time" and isinstance(self.size, timedelta):
cutoff_time = datetime.now(UTC) - self.size
while self.events and self.events[0].timestamp < cutoff_time:
self.events.popleft()
def get_events(self) -> list[RealTimeEvent]:
"""Get all events in window."""
return list(self.events)
@dataclass
class StreamProcessor:
"""Stream processing pipeline component."""
processor_id: ProcessorId
operation_type: StreamOperationType
operation_function: Callable
input_streams: list[StreamId]
output_streams: list[StreamId]
processing_stats: dict[str, float] = field(default_factory=dict)
async def process(self, event: RealTimeEvent) -> list[RealTimeEvent]:
"""Process event through this processor."""
start_time = datetime.now(UTC)
try:
if self.operation_type == StreamOperationType.FILTER:
result = [event] if await self.operation_function(event) else []
elif self.operation_type == StreamOperationType.MAP:
transformed = await self.operation_function(event)
result = [transformed] if transformed else []
elif self.operation_type == StreamOperationType.ENRICH:
enriched = await self.operation_function(event)
result = [enriched] if enriched else []
else:
result = await self.operation_function(event)
# Update processing stats
processing_time = (datetime.now(UTC) - start_time).total_seconds() * 1000
self.processing_stats["last_processing_time"] = processing_time
self.processing_stats["total_processed"] = (
self.processing_stats.get("total_processed", 0) + 1
)
return result if isinstance(result, list) else [result] if result else []
except Exception as e:
logger.error(f"Stream processor {self.processor_id} failed: {e!s}")
return []
class RealTimeProcessor:
"""Real-time IoT data processing engine with stream analytics.
Contracts:
Preconditions:
- All events must have valid timestamps and device sources
- Stream processors must be properly configured and validated
- Real-time constraints must be maintained (<100ms processing)
Postconditions:
- Events are processed within latency requirements
- Stream operations maintain data integrity and ordering
- Processing metrics are tracked and optimized continuously
Invariants:
- Event ordering is preserved within streams
- Processing latency remains within specified bounds
- Resource usage is monitored and controlled
"""
def __init__(self):
self.event_streams: dict[StreamId, deque] = defaultdict(
lambda: deque(maxlen=10000),
)
self.stream_processors: dict[ProcessorId, StreamProcessor] = {}
self.processing_pipelines: dict[str, list[ProcessorId]] = {}
self.stream_windows: dict[str, StreamWindow] = {}
# Real-time metrics
self.total_events_processed = 0
self.events_per_second = 0.0
self.average_latency = 0.0
self.peak_latency = 0.0
self.processing_queue_size = 0
# Performance monitoring
self.latency_history: deque = deque(maxlen=1000)
self.throughput_history: deque = deque(maxlen=100)
self.error_count = 0
self.last_metrics_update = datetime.now(UTC)
# Event handlers and automation triggers
self.event_handlers: dict[EventType, list[Callable]] = defaultdict(list)
self.automation_triggers: dict[str, dict[str, Any]] = {}
# Processing configuration
self.processing_mode = ProcessingMode.EVENT_DRIVEN
self.max_processing_latency = 100 # milliseconds
self.batch_size = 100
self.batch_timeout = 50 # milliseconds
# Start background processing
self.processing_task = None
self._start_background_processing()
def _start_background_processing(self) -> bool:
"""Start background processing tasks."""
if self.processing_task is None:
self.processing_task = asyncio.create_task(self._background_processor())
async def _background_processor(self) -> None:
"""Background task for continuous event processing."""
while True:
try:
# Process pending events
await self._process_pending_events()
# Update metrics
await self._update_metrics()
# Cleanup old data
await self._cleanup_old_data()
# Short sleep to prevent CPU spinning
await asyncio.sleep(0.001) # 1ms
except Exception as e:
logger.error(f"Background processor error: {e!s}")
await asyncio.sleep(0.1)
@require(lambda __self, event: event.event_id and event.source_device)
@ensure(lambda __self, result: result.is_success() or result.error_value)
async def process_event(
self,
event: RealTimeEvent,
) -> Either[IoTIntegrationError, dict[str, Any]]:
"""Process real-time IoT event through the processing pipeline.
Performance:
- <10ms event ingestion
- <50ms stream processing
- <100ms total processing latency
"""
try:
start_time = datetime.now(UTC)
# Validate event
if event.age_ms() > 5000: # 5 seconds max age
return Either.error(
IoTIntegrationError(f"Event too old: {event.age_ms()}ms"),
)
# Add to appropriate stream
stream_id = f"device_stream_{event.source_device}"
self.event_streams[stream_id].append(event)
# Process through pipeline
processing_results = []
if self.processing_mode == ProcessingMode.EVENT_DRIVEN:
results = await self._process_event_driven(event)
processing_results.extend(results)
elif self.processing_mode == ProcessingMode.STREAMING:
results = await self._process_streaming(event)
processing_results.extend(results)
elif self.processing_mode == ProcessingMode.MICRO_BATCH:
results = await self._process_micro_batch(event)
processing_results.extend(results)
# Trigger event handlers
await self._trigger_event_handlers(event)
# Check automation triggers
await self._check_automation_triggers(event)
# Update processing metrics
processing_time = (datetime.now(UTC) - start_time).total_seconds() * 1000
self.latency_history.append(processing_time)
self.total_events_processed += 1
# Check latency constraint
if processing_time > self.max_processing_latency:
logger.warning(f"Processing latency exceeded: {processing_time}ms")
processing_info = {
"event_id": event.event_id,
"processing_time_ms": processing_time,
"results_generated": len(processing_results),
"stream_id": stream_id,
"processed_at": datetime.now(UTC).isoformat(),
}
return Either.success(
{
"success": True,
"processing_info": processing_info,
"results": processing_results,
},
)
except Exception as e:
self.error_count += 1
error_msg = f"Failed to process real-time event: {e!s}"
logger.error(error_msg)
return Either.error(IoTIntegrationError(error_msg))
@require(
lambda _self, processor: processor.processor_id
and processor.operation_function,
)
async def register_stream_processor(
self,
processor: StreamProcessor,
) -> Either[IoTIntegrationError, dict[str, Any]]:
"""Register stream processor for real-time data processing.
Architecture:
- Validates processor configuration and dependencies
- Integrates into processing pipeline
- Configures input/output stream routing
"""
try:
# Validate processor
if processor.processor_id in self.stream_processors:
return Either.error(
IoTIntegrationError(
f"Processor {processor.processor_id} already exists",
),
)
# Test processor function
test_event = RealTimeEvent(
event_id="test",
event_type=EventType.SENSOR_READING,
source_device="test_device",
timestamp=datetime.now(UTC),
data={"test": True},
priority=5,
)
try:
await processor.process(test_event)
except Exception as e:
return Either.error(
IoTIntegrationError(f"Processor function test failed: {e!s}"),
)
# Register processor
self.stream_processors[processor.processor_id] = processor
processor_info = {
"processor_id": processor.processor_id,
"operation_type": processor.operation_type.value,
"input_streams": processor.input_streams,
"output_streams": processor.output_streams,
"registered_at": datetime.now(UTC).isoformat(),
}
logger.info(f"Stream processor registered: {processor.processor_id}")
return Either.success(
{
"success": True,
"processor_info": processor_info,
"total_processors": len(self.stream_processors),
},
)
except Exception as e:
error_msg = f"Failed to register stream processor: {e!s}"
logger.error(error_msg)
return Either.error(IoTIntegrationError(error_msg))
async def create_time_window(
self,
window_id: str,
duration: timedelta,
_device_filter: list[DeviceId] | None = None,
) -> Either[IoTIntegrationError, StreamWindow]:
"""Create time-based window for stream aggregation and analysis.
Performance:
- Efficient window maintenance with automatic cleanup
- Optimized event insertion and retrieval
- Memory-efficient sliding window implementation
"""
try:
if window_id in self.stream_windows:
return Either.error(
IoTIntegrationError(f"Window {window_id} already exists"),
)
window = StreamWindow(
window_id=window_id,
window_type="time",
size=duration,
)
self.stream_windows[window_id] = window
logger.info(f"Time window created: {window_id} ({duration})")
return Either.success(window)
except Exception as e:
error_msg = f"Failed to create time window: {e!s}"
logger.error(error_msg)
return Either.error(IoTIntegrationError(error_msg))
async def _process_event_driven(self, event: RealTimeEvent) -> list[dict[str, Any]]:
"""Process event in event-driven mode."""
results = []
# Process through all applicable processors
for processor in self.stream_processors.values():
if self._should_process_event(event, processor):
processed_events = await processor.process(event)
for processed_event in processed_events:
results.append(
{
"processor_id": processor.processor_id,
"event_id": processed_event.event_id,
"operation": processor.operation_type.value,
"result_data": processed_event.data,
},
)
return results
async def _process_streaming(self, event: RealTimeEvent) -> list[dict[str, Any]]:
"""Process event in streaming mode."""
results = []
# Add to relevant windows
for window in self.stream_windows.values():
window.add_event(event)
# Process window if it has enough events
if len(window.events) >= 10: # Process every 10 events
window_results = await self._process_window(window)
results.extend(window_results)
return results
async def _process_micro_batch(self, _event: RealTimeEvent) -> list[dict[str, Any]]:
"""Process event in micro-batch mode."""
# Add event to processing queue
self.processing_queue_size += 1
# Process batch if size or timeout reached
if self.processing_queue_size >= self.batch_size:
return await self._process_batch()
return []
async def _process_window(self, window: StreamWindow) -> list[dict[str, Any]]:
"""Process events in a window."""
results = []
events = window.get_events()
if not events:
return results
# Calculate window statistics
if events[0].event_type == EventType.SENSOR_READING:
values = [
event.data.get("value", 0) for event in events if "value" in event.data
]
if values:
stats = {
"window_id": window.window_id,
"event_count": len(events),
"avg_value": statistics.mean(values),
"min_value": min(values),
"max_value": max(values),
"std_dev": statistics.stdev(values) if len(values) > 1 else 0,
"time_span": (
events[-1].timestamp - events[0].timestamp
).total_seconds(),
}
results.append(stats)
return results
async def _process_batch(self) -> list[dict[str, Any]]:
"""Process accumulated micro-batch."""
# Reset queue size
self.processing_queue_size = 0
# Process batch (placeholder implementation)
return [
{"batch_processed": True, "processed_at": datetime.now(UTC).isoformat()},
]
def _should_process_event(
self,
event: RealTimeEvent,
processor: StreamProcessor,
) -> bool:
"""Check if event should be processed by processor."""
# Check if event stream matches processor input streams
event_stream = f"device_stream_{event.source_device}"
return event_stream in processor.input_streams or "*" in processor.input_streams
async def _trigger_event_handlers(self, event: RealTimeEvent) -> None:
"""Trigger registered event handlers."""
handlers = self.event_handlers.get(event.event_type, [])
for handler in handlers:
try:
await handler(event)
except Exception as e:
logger.error(f"Event handler failed: {e!s}")
async def _check_automation_triggers(self, event: RealTimeEvent) -> None:
"""Check and execute automation triggers."""
for trigger_id, trigger_config in self.automation_triggers.items():
try:
if await self._evaluate_trigger_condition(event, trigger_config):
await self._execute_automation_action(trigger_config)
except Exception as e:
logger.error(f"Automation trigger {trigger_id} failed: {e!s}")
async def _evaluate_trigger_condition(
self,
event: RealTimeEvent,
trigger_config: dict[str, Any],
) -> bool:
"""Evaluate if trigger condition is met."""
# Simple condition evaluation (can be extended)
condition = trigger_config.get("condition", {})
# SIM102 fix: Combine nested if statements
if (
"event_type" in condition
and event.event_type.value != condition["event_type"]
):
return False
if "device_id" in condition and event.source_device != condition["device_id"]:
return False
if "threshold" in condition and "value" in event.data:
threshold = condition["threshold"]
value = event.data["value"]
operator = condition.get("operator", ">=")
if (
(operator == ">=" and value < threshold)
or (operator == "<=" and value > threshold)
or (operator == "==" and value != threshold)
):
return False
return True
async def _execute_automation_action(self, trigger_config: dict[str, Any]) -> None:
"""Execute automation action."""
action = trigger_config.get("action", {})
action_type = action.get("type", "log")
if action_type == "log":
logger.info(
f"Automation triggered: {action.get('message', 'Action executed')}",
)
elif action_type == "device_control":
# Trigger device control action
device_id = action.get("device_id")
command = action.get("command")
logger.info(f"Device control triggered: {device_id} -> {command}")
async def _process_pending_events(self) -> None:
"""Process any pending events in the background."""
# Process events from all streams
for _stream_id, event_queue in self.event_streams.items():
if event_queue:
# Process up to 10 events per cycle to avoid blocking
for _ in range(min(10, len(event_queue))):
if event_queue:
event = event_queue.popleft()
await self._process_event_background(event)
async def _process_event_background(self, event: RealTimeEvent) -> None:
"""Process event in background without blocking."""
try:
# Lightweight background processing
await self._trigger_event_handlers(event)
await self._check_automation_triggers(event)
except Exception as e:
logger.error(f"Background event processing failed: {e!s}")
async def _update_metrics(self) -> None:
"""Update real-time processing metrics."""
now = datetime.now(UTC)
time_diff = (now - self.last_metrics_update).total_seconds()
if time_diff >= 1.0: # Update every second
# Calculate events per second
recent_count = len([h for h in self.latency_history if h is not None])
self.events_per_second = recent_count / max(time_diff, 1.0)
# Calculate latencies
if self.latency_history:
self.average_latency = statistics.mean(self.latency_history)
self.peak_latency = max(self.latency_history)
# Store throughput history
self.throughput_history.append(self.events_per_second)
self.last_metrics_update = now
async def _cleanup_old_data(self) -> None:
"""Clean up old data to prevent memory leaks."""
cutoff_time = datetime.now(UTC) - timedelta(hours=1)
# Clean up old events from streams
for _stream_id, event_queue in self.event_streams.items():
while event_queue and event_queue[0].timestamp < cutoff_time:
event_queue.popleft()
# Clean up old windows
for _window_id, window in list(self.stream_windows.items()):
window._maintain_window_size()
def register_event_handler(self, event_type: EventType, handler: Callable) -> None:
"""Register event handler for specific event type."""
self.event_handlers[event_type].append(handler)
def register_automation_trigger(
self,
trigger_id: str,
trigger_config: dict[str, Any],
) -> None:
"""Register automation trigger with conditions and actions."""
self.automation_triggers[trigger_id] = trigger_config
async def get_processing_metrics(self) -> dict[str, Any]:
"""Get real-time processing metrics and performance statistics."""
return {
"total_events_processed": self.total_events_processed,
"events_per_second": self.events_per_second,
"average_latency_ms": self.average_latency,
"peak_latency_ms": self.peak_latency,
"processing_queue_size": self.processing_queue_size,
"active_streams": len(self.event_streams),
"registered_processors": len(self.stream_processors),
"active_windows": len(self.stream_windows),
"error_count": self.error_count,
"processing_mode": self.processing_mode.value,
"max_processing_latency": self.max_processing_latency,
"registered_event_handlers": sum(
len(handlers) for handlers in self.event_handlers.values()
),
"automation_triggers": len(self.automation_triggers),
}
# Helper functions for real-time processing
def create_sensor_event(
device_id: DeviceId,
sensor_data: dict[str, Any],
priority: int = 5,
) -> RealTimeEvent:
"""Create sensor reading event."""
return RealTimeEvent(
event_id=str(uuid.uuid4()),
event_type=EventType.SENSOR_READING,
source_device=device_id,
timestamp=datetime.now(UTC),
data=sensor_data,
priority=priority,
)
def create_threshold_processor(
threshold: float,
comparison: str = ">=",
) -> StreamProcessor:
"""Create threshold-based filter processor."""
async def threshold_filter(event: RealTimeEvent) -> bool:
value = event.data.get("value", 0)
# SIM116 fix: Use dictionary instead of consecutive if statements
comparison_ops = {
">=": lambda v, t: v >= t,
"<=": lambda v, t: v <= t,
"==": lambda v, t: v == t,
}
op_func = comparison_ops.get(comparison)
return op_func(value, threshold) if op_func else False
return StreamProcessor(
processor_id=f"threshold_{comparison}_{threshold}",
operation_type=StreamOperationType.FILTER,
operation_function=threshold_filter,
input_streams=["*"],
output_streams=["threshold_alerts"],
)
def create_enrichment_processor(enrichment_data: dict[str, Any]) -> StreamProcessor:
"""Create data enrichment processor."""
async def enrich_event(event: RealTimeEvent) -> RealTimeEvent:
enriched_data = event.data.copy()
enriched_data.update(enrichment_data)
return RealTimeEvent(
event_id=f"enriched_{event.event_id}",
event_type=event.event_type,
source_device=event.source_device,
timestamp=event.timestamp,
data=enriched_data,
priority=event.priority,
correlation_id=event.event_id,
)
return StreamProcessor(
processor_id=f"enricher_{uuid.uuid4().hex[:8]}",
operation_type=StreamOperationType.ENRICH,
operation_function=enrich_event,
input_streams=["*"],
output_streams=["enriched_stream"],
)